diff --git a/backend/dev.sh b/backend/dev.sh index 1186cd43..bf4043c6 100755 --- a/backend/dev.sh +++ b/backend/dev.sh @@ -6,4 +6,4 @@ set -e MOONSTREAM_HOST="${MOONSTREAM_HOST:-0.0.0.0}" MOONSTREAM_PORT="${MOONSTREAM_PORT:-7481}" -uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" moonstream.api:app --workers 2 $@ +uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" moonstream.api:app --reload diff --git a/backend/moonstream/actions.py b/backend/moonstream/actions.py index 9ee8130d..ab133eb2 100644 --- a/backend/moonstream/actions.py +++ b/backend/moonstream/actions.py @@ -1,268 +1,18 @@ -from datetime import datetime import logging +from typing import List, Optional -from typing import Dict, Any, List, Optional, Union - -from sqlalchemy.engine.base import Transaction from moonstreamdb.models import ( - EthereumBlock, - EthereumTransaction, - EthereumPendingTransaction, EthereumAddress, EthereumLabel, ) -from sqlalchemy import or_, and_, text from sqlalchemy.orm import Session from . import data -from .settings import DEFAULT_STREAM_TIMEINTERVAL - - logger = logging.getLogger(__name__) -async def get_transaction_in_blocks( - db_session: Session, - query: str, - user_subscriptions_resources_by_address: Dict[str, Any], - boundaries: data.PageBoundary, -) -> data.EthereumTransactionResponse: - - """ - Request transactions from database based on addresses from user subscriptions - and selected boundaries. - - streams empty for user without subscriptions - Return last available transaction if boundaries is empty - - """ - - subscriptions_addresses = list(user_subscriptions_resources_by_address.keys()) - - if boundaries.start_time < 1438215988: # first block - boundaries.start_time = 0 - - if boundaries.end_time < 1438215988: # first block - boundaries.end_time = 0 - - if query == "" or query == " ": - - filters = [ - or_( - EthereumTransaction.to_address == address, - EthereumTransaction.from_address == address, - ) - for address in subscriptions_addresses - ] - filters = or_(*filters) - - else: - filters = parse_search_query_to_sqlalchemy_filters( - query, allowed_addresses=subscriptions_addresses - ) - if not filters: - return data.EthereumTransactionResponse( - stream=[], - boundaries=boundaries, - ) - filters = and_(*filters) - - ethereum_transactions_in_subscriptions = ( - db_session.query( - EthereumTransaction.hash, - EthereumTransaction.block_number, - EthereumTransaction.from_address, - EthereumTransaction.to_address, - EthereumTransaction.gas, - EthereumTransaction.gas_price, - EthereumTransaction.input, - EthereumTransaction.nonce, - EthereumTransaction.value, - EthereumBlock.timestamp.label("timestamp"), - ) - .join(EthereumBlock) - .filter(filters) - ) - - ethereum_transactions = ethereum_transactions_in_subscriptions - - # If not start_time and end_time not present - # Get latest transaction - if boundaries.end_time == 0: - ethereum_transaction_start_point = ( - ethereum_transactions_in_subscriptions.order_by( - text("timestamp desc") - ).limit(1) - ).one_or_none() - if ethereum_transaction_start_point: - boundaries.end_time = ethereum_transaction_start_point[-1] - boundaries.start_time = ( - ethereum_transaction_start_point[-1] - DEFAULT_STREAM_TIMEINTERVAL - ) - - if boundaries.start_time != 0 and boundaries.end_time != 0: - if boundaries.start_time > boundaries.end_time: - boundaries.start_time, boundaries.end_time = ( - boundaries.end_time, - boundaries.start_time, - ) - - if boundaries.end_time: - ethereum_transactions = ethereum_transactions.filter( - include_or_not_lower( - EthereumBlock.timestamp, boundaries.include_end, boundaries.end_time - ) - ) - - next_transaction = ( - ethereum_transactions_in_subscriptions.filter( - EthereumBlock.timestamp > boundaries.end_time - ) - .order_by(text("timestamp ASC")) - .limit(1) - ) - - next_transaction = next_transaction.one_or_none() - - if next_transaction: - boundaries.next_event_time = next_transaction[-1] - else: - boundaries.next_event_time = None - - if boundaries.start_time: - ethereum_transactions = ethereum_transactions.filter( - include_or_not_grater( - EthereumBlock.timestamp, - boundaries.include_start, - boundaries.start_time, - ) - ) - - previous_transaction = ( - ethereum_transactions_in_subscriptions.filter( - EthereumBlock.timestamp < boundaries.start_time - ) - .order_by(text("timestamp desc")) - .limit(1) - ).one_or_none() - - if previous_transaction: - boundaries.previous_event_time = previous_transaction[-1] - else: - boundaries.previous_event_time = None - - response = [] - for ( - hash, - block_number, - from_address, - to_address, - gas, - gas_price, - input, - nonce, - value, - timestamp, - ) in ethereum_transactions: - - # Apply subscription data to each transaction - subscription_type_id = None - from_label = None - to_label = None - color = None - - if from_address in subscriptions_addresses: - from_label = user_subscriptions_resources_by_address[from_address]["label"] - subscription_type_id = user_subscriptions_resources_by_address[ - from_address - ]["subscription_type_id"] - color = user_subscriptions_resources_by_address[from_address]["color"] - - if to_address in subscriptions_addresses: - subscription_type_id = user_subscriptions_resources_by_address[to_address][ - "subscription_type_id" - ] - to_label = user_subscriptions_resources_by_address[to_address]["label"] - color = user_subscriptions_resources_by_address[to_address]["color"] - - response.append( - data.EthereumTransactionItem( - color=color, - from_label=from_label, - to_label=to_label, - block_number=block_number, - gas=gas, - gasPrice=gas_price, - value=value, - from_address=from_address, - to_address=to_address, - hash=hash, - input=input, - nonce=nonce, - timestamp=timestamp, - subscription_type_id=subscription_type_id, - ) - ) - - return data.EthereumTransactionResponse(stream=response, boundaries=boundaries) - - -def include_or_not_grater(value1, include, value2): - if include: - return value1 >= value2 - else: - return value1 > value2 - - -def include_or_not_lower(value1, include, value2): - if include: - return value1 <= value2 - else: - return value1 < value2 - - -def parse_search_query_to_sqlalchemy_filters(q: str, allowed_addresses: List[str]): - - """ - Return list of sqlalchemy filters or empty list - """ - - filters = q.split("+") - constructed_filters = [] - for filter_item in filters: - if filter_item == "": - logger.warning("Skipping empty filter item") - continue - - # Try Google style search filters - components = filter_item.split(":") - if len(components) == 2: - filter_type = components[0] - filter_value = components[1] - else: - continue - - if filter_type == "to" and filter_value: - constructed_filters.append(EthereumTransaction.to_address == filter_value) - - if filter_type == "from" and filter_value: - if filter_value not in allowed_addresses: - continue - constructed_filters.append(EthereumTransaction.from_address == filter_value) - - if filter_type == "address" and filter_value: - constructed_filters.append( - or_( - EthereumTransaction.to_address == filter_value, - EthereumTransaction.from_address == filter_value, - ) - ) - - return constructed_filters - - def get_address_labels( db_session: Session, start: int, limit: int, addresses: Optional[List[str]] = None ) -> List[EthereumAddress]: diff --git a/backend/moonstream/data.py b/backend/moonstream/data.py index a55d4ab1..9c38619d 100644 --- a/backend/moonstream/data.py +++ b/backend/moonstream/data.py @@ -127,28 +127,17 @@ class StreamBoundary(BaseModel): include_end: bool = False -class PageBoundary(StreamBoundary): - """ - A PageBoundary adds information about previous and subsequent events to a StreamBoundary. - - This additional information helps callers manage their views into a stream. - """ - - next_event_time: Optional[int] = None - previous_event_time: Optional[int] = None - - -class EthereumTransactionResponse(BaseModel): - stream: List[EthereumTransactionItem] - boundaries: Optional[PageBoundary] - - class Event(BaseModel): event_type: str event_timestamp: int # Seconds since epoch event_data: Dict[str, Any] = Field(default_factory=dict) +class GetEventsResponse(BaseModel): + stream_boundary: StreamBoundary + events: List[Event] = Field(default_factory=list) + + class TxinfoEthereumBlockchainRequest(BaseModel): tx: EthereumTransaction diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 93345801..00669474 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -11,6 +11,7 @@ from moonstreamdb.models import ( ) from sqlalchemy import or_, and_, text from sqlalchemy.orm import Session, Query +from sqlalchemy.sql.functions import user from .. import data from ..settings import DEFAULT_STREAM_TIMEINTERVAL @@ -93,9 +94,9 @@ def parse_filters( return None subscribed_addresses = { - subscription.get("address") + subscription.resource_data.get("address") for subscription in provider_subscriptions - if subscription.get("address") is not None + if subscription.resource_data.get("address") is not None } requires_ethereum_blockchain_data = False @@ -222,6 +223,7 @@ def get_events( data_access_token: str, stream_boundary: data.StreamBoundary, query: StreamQuery, + user_subscriptions: Dict[str, List[Dict[str, Any]]], ) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]: """ Returns ethereum_blockchain events for the given addresses in the time period represented @@ -229,7 +231,7 @@ def get_events( If the query does not require any data from this provider, returns None. """ - parsed_filters = parse_filters(query) + parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: return None @@ -257,6 +259,7 @@ def latest_events( data_access_token: str, query: StreamQuery, num_events: int, + user_subscriptions: Dict[str, List[Dict[str, Any]]], ) -> Optional[List[data.Event]]: """ Returns the num_events latest events from the current provider, subject to the constraints imposed @@ -269,7 +272,7 @@ def latest_events( stream_boundary = data.StreamBoundary( start_time=0, include_start=True, end_time=None, include_end=False ) - parsed_filters = parse_filters(query) + parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: return None ethereum_transactions = ( @@ -288,6 +291,7 @@ def next_event( data_access_token: str, stream_boundary: data.StreamBoundary, query: StreamQuery, + user_subscriptions: Dict[str, List[Dict[str, Any]]], ) -> Optional[data.Event]: """ Returns the earliest event occuring after the given stream boundary corresponding to the given @@ -304,7 +308,7 @@ def next_event( end_time=None, include_end=False, ) - parsed_filters = parse_filters(query) + parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: return None @@ -327,6 +331,7 @@ def previous_event( data_access_token: str, stream_boundary: data.StreamBoundary, query: StreamQuery, + user_subscriptions: Dict[str, List[Dict[str, Any]]], ) -> Optional[data.Event]: """ Returns the latest event occuring before the given stream boundary corresponding to the given @@ -343,7 +348,7 @@ def previous_event( end_time=stream_boundary.start_time, include_end=(not stream_boundary.include_start), ) - parsed_filters = parse_filters(query) + parsed_filters = parse_filters(query, user_subscriptions) if parsed_filters is None: return None maybe_ethereum_transaction = ( @@ -358,241 +363,3 @@ def previous_event( if maybe_ethereum_transaction is None: return None return ethereum_transaction_event(maybe_ethereum_transaction) - - -async def get_transaction_in_blocks( - db_session: Session, - query: str, - user_subscriptions_resources_by_address: Dict[str, Any], - boundaries: data.PageBoundary, -) -> data.EthereumTransactionResponse: - """ - Request transactions from database based on addresses from user subscriptions - and selected boundaries. - - streams empty for user without subscriptions - Return last available transaction if boundaries is empty - """ - - subscriptions_addresses = list(user_subscriptions_resources_by_address.keys()) - - if boundaries.start_time < 1438215988: # first block - boundaries.start_time = 0 - - if boundaries.end_time < 1438215988: # first block - boundaries.end_time = 0 - - if query == "" or query == " ": - - filters = [ - or_( - EthereumTransaction.to_address == address, - EthereumTransaction.from_address == address, - ) - for address in subscriptions_addresses - ] - filters = or_(*filters) - - else: - filters = parse_search_query_to_sqlalchemy_filters( - query, allowed_addresses=subscriptions_addresses - ) - if not filters: - return data.EthereumTransactionResponse( - stream=[], - boundaries=boundaries, - ) - filters = and_(*filters) - - ethereum_transactions_in_subscriptions = ( - db_session.query( - EthereumTransaction.hash, - EthereumTransaction.block_number, - EthereumTransaction.from_address, - EthereumTransaction.to_address, - EthereumTransaction.gas, - EthereumTransaction.gas_price, - EthereumTransaction.input, - EthereumTransaction.nonce, - EthereumTransaction.value, - EthereumBlock.timestamp.label("timestamp"), - ) - .join(EthereumBlock) - .filter(filters) - ) - - ethereum_transactions = ethereum_transactions_in_subscriptions - - # If not start_time and end_time not present - # Get latest transaction - if boundaries.end_time == 0: - ethereum_transaction_start_point = ( - ethereum_transactions_in_subscriptions.order_by( - text("timestamp desc") - ).limit(1) - ).one_or_none() - if ethereum_transaction_start_point: - boundaries.end_time = ethereum_transaction_start_point[-1] - boundaries.start_time = ( - ethereum_transaction_start_point[-1] - DEFAULT_STREAM_TIMEINTERVAL - ) - - if boundaries.start_time != 0 and boundaries.end_time != 0: - if boundaries.start_time > boundaries.end_time: - boundaries.start_time, boundaries.end_time = ( - boundaries.end_time, - boundaries.start_time, - ) - - if boundaries.end_time: - ethereum_transactions = ethereum_transactions.filter( - include_or_not_lower( - EthereumBlock.timestamp, boundaries.include_end, boundaries.end_time - ) - ) - - next_transaction = ( - ethereum_transactions_in_subscriptions.filter( - EthereumBlock.timestamp > boundaries.end_time - ) - .order_by(text("timestamp ASC")) - .limit(1) - ) - - next_transaction = next_transaction.one_or_none() - - if next_transaction: - boundaries.next_event_time = next_transaction[-1] - else: - boundaries.next_event_time = None - - if boundaries.start_time: - ethereum_transactions = ethereum_transactions.filter( - include_or_not_greater( - EthereumBlock.timestamp, - boundaries.include_start, - boundaries.start_time, - ) - ) - - previous_transaction = ( - ethereum_transactions_in_subscriptions.filter( - EthereumBlock.timestamp < boundaries.start_time - ) - .order_by(text("timestamp desc")) - .limit(1) - ).one_or_none() - - if previous_transaction: - boundaries.previous_event_time = previous_transaction[-1] - else: - boundaries.previous_event_time = None - - response = [] - for ( - hash, - block_number, - from_address, - to_address, - gas, - gas_price, - input, - nonce, - value, - timestamp, - ) in ethereum_transactions: - - # Apply subscription data to each transaction - subscription_type_id = None - from_label = None - to_label = None - color = None - - if from_address in subscriptions_addresses: - from_label = user_subscriptions_resources_by_address[from_address]["label"] - subscription_type_id = user_subscriptions_resources_by_address[ - from_address - ]["subscription_type_id"] - color = user_subscriptions_resources_by_address[from_address]["color"] - - if to_address in subscriptions_addresses: - subscription_type_id = user_subscriptions_resources_by_address[to_address][ - "subscription_type_id" - ] - to_label = user_subscriptions_resources_by_address[to_address]["label"] - color = user_subscriptions_resources_by_address[to_address]["color"] - - response.append( - data.EthereumTransactionItem( - color=color, - from_label=from_label, - to_label=to_label, - block_number=block_number, - gas=gas, - gasPrice=gas_price, - value=value, - from_address=from_address, - to_address=to_address, - hash=hash, - input=input, - nonce=nonce, - timestamp=timestamp, - subscription_type_id=subscription_type_id, - ) - ) - - return data.EthereumTransactionResponse(stream=response, boundaries=boundaries) - - -def include_or_not_greater(value1, include, value2): - if include: - return value1 >= value2 - else: - return value1 > value2 - - -def include_or_not_lower(value1, include, value2): - if include: - return value1 <= value2 - else: - return value1 < value2 - - -def parse_search_query_to_sqlalchemy_filters(q: str, allowed_addresses: List[str]): - - """ - Return list of sqlalchemy filters or empty list - """ - - filters = q.split("+") - constructed_filters = [] - for filter_item in filters: - if filter_item == "": - logger.warning("Skipping empty filter item") - continue - - # Try Google style search filters - components = filter_item.split(":") - if len(components) == 2: - filter_type = components[0] - filter_value = components[1] - else: - continue - - if filter_type == "to" and filter_value: - constructed_filters.append(EthereumTransaction.to_address == filter_value) - - if filter_type == "from" and filter_value: - if filter_value not in allowed_addresses: - continue - constructed_filters.append(EthereumTransaction.from_address == filter_value) - - if filter_type == "address" and filter_value: - constructed_filters.append( - or_( - EthereumTransaction.to_address == filter_value, - EthereumTransaction.from_address == filter_value, - ) - ) - - return constructed_filters diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 9c587cda..10a647e7 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -10,6 +10,7 @@ from fastapi import FastAPI, HTTPException, Request, Query, Depends from fastapi.middleware.cors import CORSMiddleware from moonstreamdb import db from sqlalchemy.orm import Session +from sqlalchemy.sql.functions import user from .. import data @@ -17,11 +18,14 @@ from ..middleware import BroodAuthMiddleware from ..providers import ethereum_blockchain from ..settings import ( DOCS_TARGET_PATH, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_DATA_JOURNAL_ID, ORIGINS, DOCS_PATHS, bugout_client as bc, BUGOUT_REQUEST_TIMEOUT_SECONDS, ) +from .. import stream_queries from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION from ..version import MOONSTREAM_VERSION @@ -66,10 +70,10 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]: timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) - # TODO(kompotkot, zomglings): PAGINATION!!! + # TODO(andrey, kompotkot, zomglings): PAGINATION!!! user_subscriptions: Dict[str, List[Dict[str, Any]]] = {} for subscription in response.resources: - subscription_type = subscription.get("subscription_type_id") + subscription_type = subscription.resource_data.get("subscription_type_id") if subscription_type is None: continue if user_subscriptions.get(subscription_type) is None: @@ -79,56 +83,51 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]: return user_subscriptions -@app.get("/", tags=["streams"]) +EVENT_PROVIDERS: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain} + + +@app.get("/", tags=["streams"], response_model=data.GetEventsResponse) async def search_transactions( request: Request, q: str = Query(""), - start_time: Optional[int] = Query(0), - end_time: Optional[int] = Query(0), - include_start: Optional[bool] = Query(False), - include_end: Optional[bool] = Query(False), + start_time: int = Query(0), + end_time: Optional[int] = Query(None), + include_start: bool = Query(False), + include_end: bool = Query(False), db_session: Session = Depends(db.yield_db_session), -): - # get user subscriptions - token = request.state.token - params = {"user_id": str(request.state.user.id)} - try: - # TODO(andrey, kompotkot): This query should filter resources of type "subscription". We may - # create other resources for users. When we do, I think this code will break. - # See how we apply this filter for "type": "subscription_type" in the /subscriptions route. - user_subscriptions_resources: BugoutResources = bc.list_resources( - token=token, params=params - ) - except BugoutResponseException as e: - if e.detail == "Resources not found": - return data.EthereumTransactionResponse(stream=[]) - raise HTTPException(status_code=e.status_code, detail=e.detail) - except Exception as e: - raise HTTPException(status_code=500) - - # TODO(andrey, kompotkot): Pagination over resources!! - # Issue: https://github.com/bugout-dev/brood/issues/14 - address_to_subscriptions = { - resource.resource_data["address"]: resource.resource_data - for resource in user_subscriptions_resources.resources - } - - boundaries = data.PageBoundary( +) -> data.GetEventsResponse: + stream_boundary = data.StreamBoundary( start_time=start_time, end_time=end_time, - next_event_time=0, - previous_event_time=0, include_start=include_start, include_end=include_end, ) - if address_to_subscriptions: - response = await ethereum_blockchain.get_transaction_in_blocks( - db_session=db_session, - query=q, - user_subscriptions_resources_by_address=address_to_subscriptions, - boundaries=boundaries, + user_subscriptions = get_user_subscriptions(request.state.token) + query = stream_queries.StreamQuery( + subscription_types=[subtype for subtype in EVENT_PROVIDERS], subscriptions=[] + ) + if q.strip() != "": + query = stream_queries.parse_query_string(q) + + results = { + event_type: provider.get_events( + db_session, + bc, + MOONSTREAM_DATA_JOURNAL_ID, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + stream_boundary, + query, + user_subscriptions, ) - return response - else: - return data.EthereumTransactionResponse(stream=[], boundaries=boundaries) + for event_type, provider in EVENT_PROVIDERS.items() + } + events = [ + event + for _, event_list in results.values() + if event_list is not None + for event in event_list + ] + events.sort(key=lambda event: event.event_timestamp, reverse=True) + response = data.GetEventsResponse(stream_boundary=stream_boundary, events=events) + return response diff --git a/backend/moonstream/routes/users.py b/backend/moonstream/routes/users.py index 3cbb95ce..5101a25c 100644 --- a/backend/moonstream/routes/users.py +++ b/backend/moonstream/routes/users.py @@ -153,7 +153,9 @@ async def login_handler( application_id=MOONSTREAM_APPLICATION_ID, ) except BugoutResponseException as e: - raise HTTPException(status_code=e.status_code) + raise HTTPException( + status_code=e.status_code, detail=f"Error from Brood API: {e.detail}" + ) except Exception as e: raise HTTPException(status_code=500) return token