Hooked up ethereum_blockchain provider to /streams/ route

pull/105/head
Neeraj Kashyap 2021-08-19 08:09:58 -07:00
rodzic 8d07d0a53b
commit 5db3c60769
6 zmienionych plików z 64 dodań i 557 usunięć

Wyświetl plik

@ -6,4 +6,4 @@ set -e
MOONSTREAM_HOST="${MOONSTREAM_HOST:-0.0.0.0}"
MOONSTREAM_PORT="${MOONSTREAM_PORT:-7481}"
uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" moonstream.api:app --workers 2 $@
uvicorn --port "$MOONSTREAM_PORT" --host "$MOONSTREAM_HOST" moonstream.api:app --reload

Wyświetl plik

@ -1,268 +1,18 @@
from datetime import datetime
import logging
from typing import List, Optional
from typing import Dict, Any, List, Optional, Union
from sqlalchemy.engine.base import Transaction
from moonstreamdb.models import (
EthereumBlock,
EthereumTransaction,
EthereumPendingTransaction,
EthereumAddress,
EthereumLabel,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session
from . import data
from .settings import DEFAULT_STREAM_TIMEINTERVAL
logger = logging.getLogger(__name__)
async def get_transaction_in_blocks(
db_session: Session,
query: str,
user_subscriptions_resources_by_address: Dict[str, Any],
boundaries: data.PageBoundary,
) -> data.EthereumTransactionResponse:
"""
Request transactions from database based on addresses from user subscriptions
and selected boundaries.
streams empty for user without subscriptions
Return last available transaction if boundaries is empty
"""
subscriptions_addresses = list(user_subscriptions_resources_by_address.keys())
if boundaries.start_time < 1438215988: # first block
boundaries.start_time = 0
if boundaries.end_time < 1438215988: # first block
boundaries.end_time = 0
if query == "" or query == " ":
filters = [
or_(
EthereumTransaction.to_address == address,
EthereumTransaction.from_address == address,
)
for address in subscriptions_addresses
]
filters = or_(*filters)
else:
filters = parse_search_query_to_sqlalchemy_filters(
query, allowed_addresses=subscriptions_addresses
)
if not filters:
return data.EthereumTransactionResponse(
stream=[],
boundaries=boundaries,
)
filters = and_(*filters)
ethereum_transactions_in_subscriptions = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
)
ethereum_transactions = ethereum_transactions_in_subscriptions
# If not start_time and end_time not present
# Get latest transaction
if boundaries.end_time == 0:
ethereum_transaction_start_point = (
ethereum_transactions_in_subscriptions.order_by(
text("timestamp desc")
).limit(1)
).one_or_none()
if ethereum_transaction_start_point:
boundaries.end_time = ethereum_transaction_start_point[-1]
boundaries.start_time = (
ethereum_transaction_start_point[-1] - DEFAULT_STREAM_TIMEINTERVAL
)
if boundaries.start_time != 0 and boundaries.end_time != 0:
if boundaries.start_time > boundaries.end_time:
boundaries.start_time, boundaries.end_time = (
boundaries.end_time,
boundaries.start_time,
)
if boundaries.end_time:
ethereum_transactions = ethereum_transactions.filter(
include_or_not_lower(
EthereumBlock.timestamp, boundaries.include_end, boundaries.end_time
)
)
next_transaction = (
ethereum_transactions_in_subscriptions.filter(
EthereumBlock.timestamp > boundaries.end_time
)
.order_by(text("timestamp ASC"))
.limit(1)
)
next_transaction = next_transaction.one_or_none()
if next_transaction:
boundaries.next_event_time = next_transaction[-1]
else:
boundaries.next_event_time = None
if boundaries.start_time:
ethereum_transactions = ethereum_transactions.filter(
include_or_not_grater(
EthereumBlock.timestamp,
boundaries.include_start,
boundaries.start_time,
)
)
previous_transaction = (
ethereum_transactions_in_subscriptions.filter(
EthereumBlock.timestamp < boundaries.start_time
)
.order_by(text("timestamp desc"))
.limit(1)
).one_or_none()
if previous_transaction:
boundaries.previous_event_time = previous_transaction[-1]
else:
boundaries.previous_event_time = None
response = []
for (
hash,
block_number,
from_address,
to_address,
gas,
gas_price,
input,
nonce,
value,
timestamp,
) in ethereum_transactions:
# Apply subscription data to each transaction
subscription_type_id = None
from_label = None
to_label = None
color = None
if from_address in subscriptions_addresses:
from_label = user_subscriptions_resources_by_address[from_address]["label"]
subscription_type_id = user_subscriptions_resources_by_address[
from_address
]["subscription_type_id"]
color = user_subscriptions_resources_by_address[from_address]["color"]
if to_address in subscriptions_addresses:
subscription_type_id = user_subscriptions_resources_by_address[to_address][
"subscription_type_id"
]
to_label = user_subscriptions_resources_by_address[to_address]["label"]
color = user_subscriptions_resources_by_address[to_address]["color"]
response.append(
data.EthereumTransactionItem(
color=color,
from_label=from_label,
to_label=to_label,
block_number=block_number,
gas=gas,
gasPrice=gas_price,
value=value,
from_address=from_address,
to_address=to_address,
hash=hash,
input=input,
nonce=nonce,
timestamp=timestamp,
subscription_type_id=subscription_type_id,
)
)
return data.EthereumTransactionResponse(stream=response, boundaries=boundaries)
def include_or_not_grater(value1, include, value2):
if include:
return value1 >= value2
else:
return value1 > value2
def include_or_not_lower(value1, include, value2):
if include:
return value1 <= value2
else:
return value1 < value2
def parse_search_query_to_sqlalchemy_filters(q: str, allowed_addresses: List[str]):
"""
Return list of sqlalchemy filters or empty list
"""
filters = q.split("+")
constructed_filters = []
for filter_item in filters:
if filter_item == "":
logger.warning("Skipping empty filter item")
continue
# Try Google style search filters
components = filter_item.split(":")
if len(components) == 2:
filter_type = components[0]
filter_value = components[1]
else:
continue
if filter_type == "to" and filter_value:
constructed_filters.append(EthereumTransaction.to_address == filter_value)
if filter_type == "from" and filter_value:
if filter_value not in allowed_addresses:
continue
constructed_filters.append(EthereumTransaction.from_address == filter_value)
if filter_type == "address" and filter_value:
constructed_filters.append(
or_(
EthereumTransaction.to_address == filter_value,
EthereumTransaction.from_address == filter_value,
)
)
return constructed_filters
def get_address_labels(
db_session: Session, start: int, limit: int, addresses: Optional[List[str]] = None
) -> List[EthereumAddress]:

Wyświetl plik

@ -127,28 +127,17 @@ class StreamBoundary(BaseModel):
include_end: bool = False
class PageBoundary(StreamBoundary):
"""
A PageBoundary adds information about previous and subsequent events to a StreamBoundary.
This additional information helps callers manage their views into a stream.
"""
next_event_time: Optional[int] = None
previous_event_time: Optional[int] = None
class EthereumTransactionResponse(BaseModel):
stream: List[EthereumTransactionItem]
boundaries: Optional[PageBoundary]
class Event(BaseModel):
event_type: str
event_timestamp: int # Seconds since epoch
event_data: Dict[str, Any] = Field(default_factory=dict)
class GetEventsResponse(BaseModel):
stream_boundary: StreamBoundary
events: List[Event] = Field(default_factory=list)
class TxinfoEthereumBlockchainRequest(BaseModel):
tx: EthereumTransaction

Wyświetl plik

@ -11,6 +11,7 @@ from moonstreamdb.models import (
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query
from sqlalchemy.sql.functions import user
from .. import data
from ..settings import DEFAULT_STREAM_TIMEINTERVAL
@ -93,9 +94,9 @@ def parse_filters(
return None
subscribed_addresses = {
subscription.get("address")
subscription.resource_data.get("address")
for subscription in provider_subscriptions
if subscription.get("address") is not None
if subscription.resource_data.get("address") is not None
}
requires_ethereum_blockchain_data = False
@ -222,6 +223,7 @@ def get_events(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
"""
Returns ethereum_blockchain events for the given addresses in the time period represented
@ -229,7 +231,7 @@ def get_events(
If the query does not require any data from this provider, returns None.
"""
parsed_filters = parse_filters(query)
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None
@ -257,6 +259,7 @@ def latest_events(
data_access_token: str,
query: StreamQuery,
num_events: int,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
) -> Optional[List[data.Event]]:
"""
Returns the num_events latest events from the current provider, subject to the constraints imposed
@ -269,7 +272,7 @@ def latest_events(
stream_boundary = data.StreamBoundary(
start_time=0, include_start=True, end_time=None, include_end=False
)
parsed_filters = parse_filters(query)
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None
ethereum_transactions = (
@ -288,6 +291,7 @@ def next_event(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
) -> Optional[data.Event]:
"""
Returns the earliest event occuring after the given stream boundary corresponding to the given
@ -304,7 +308,7 @@ def next_event(
end_time=None,
include_end=False,
)
parsed_filters = parse_filters(query)
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None
@ -327,6 +331,7 @@ def previous_event(
data_access_token: str,
stream_boundary: data.StreamBoundary,
query: StreamQuery,
user_subscriptions: Dict[str, List[Dict[str, Any]]],
) -> Optional[data.Event]:
"""
Returns the latest event occuring before the given stream boundary corresponding to the given
@ -343,7 +348,7 @@ def previous_event(
end_time=stream_boundary.start_time,
include_end=(not stream_boundary.include_start),
)
parsed_filters = parse_filters(query)
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:
return None
maybe_ethereum_transaction = (
@ -358,241 +363,3 @@ def previous_event(
if maybe_ethereum_transaction is None:
return None
return ethereum_transaction_event(maybe_ethereum_transaction)
async def get_transaction_in_blocks(
db_session: Session,
query: str,
user_subscriptions_resources_by_address: Dict[str, Any],
boundaries: data.PageBoundary,
) -> data.EthereumTransactionResponse:
"""
Request transactions from database based on addresses from user subscriptions
and selected boundaries.
streams empty for user without subscriptions
Return last available transaction if boundaries is empty
"""
subscriptions_addresses = list(user_subscriptions_resources_by_address.keys())
if boundaries.start_time < 1438215988: # first block
boundaries.start_time = 0
if boundaries.end_time < 1438215988: # first block
boundaries.end_time = 0
if query == "" or query == " ":
filters = [
or_(
EthereumTransaction.to_address == address,
EthereumTransaction.from_address == address,
)
for address in subscriptions_addresses
]
filters = or_(*filters)
else:
filters = parse_search_query_to_sqlalchemy_filters(
query, allowed_addresses=subscriptions_addresses
)
if not filters:
return data.EthereumTransactionResponse(
stream=[],
boundaries=boundaries,
)
filters = and_(*filters)
ethereum_transactions_in_subscriptions = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
)
ethereum_transactions = ethereum_transactions_in_subscriptions
# If not start_time and end_time not present
# Get latest transaction
if boundaries.end_time == 0:
ethereum_transaction_start_point = (
ethereum_transactions_in_subscriptions.order_by(
text("timestamp desc")
).limit(1)
).one_or_none()
if ethereum_transaction_start_point:
boundaries.end_time = ethereum_transaction_start_point[-1]
boundaries.start_time = (
ethereum_transaction_start_point[-1] - DEFAULT_STREAM_TIMEINTERVAL
)
if boundaries.start_time != 0 and boundaries.end_time != 0:
if boundaries.start_time > boundaries.end_time:
boundaries.start_time, boundaries.end_time = (
boundaries.end_time,
boundaries.start_time,
)
if boundaries.end_time:
ethereum_transactions = ethereum_transactions.filter(
include_or_not_lower(
EthereumBlock.timestamp, boundaries.include_end, boundaries.end_time
)
)
next_transaction = (
ethereum_transactions_in_subscriptions.filter(
EthereumBlock.timestamp > boundaries.end_time
)
.order_by(text("timestamp ASC"))
.limit(1)
)
next_transaction = next_transaction.one_or_none()
if next_transaction:
boundaries.next_event_time = next_transaction[-1]
else:
boundaries.next_event_time = None
if boundaries.start_time:
ethereum_transactions = ethereum_transactions.filter(
include_or_not_greater(
EthereumBlock.timestamp,
boundaries.include_start,
boundaries.start_time,
)
)
previous_transaction = (
ethereum_transactions_in_subscriptions.filter(
EthereumBlock.timestamp < boundaries.start_time
)
.order_by(text("timestamp desc"))
.limit(1)
).one_or_none()
if previous_transaction:
boundaries.previous_event_time = previous_transaction[-1]
else:
boundaries.previous_event_time = None
response = []
for (
hash,
block_number,
from_address,
to_address,
gas,
gas_price,
input,
nonce,
value,
timestamp,
) in ethereum_transactions:
# Apply subscription data to each transaction
subscription_type_id = None
from_label = None
to_label = None
color = None
if from_address in subscriptions_addresses:
from_label = user_subscriptions_resources_by_address[from_address]["label"]
subscription_type_id = user_subscriptions_resources_by_address[
from_address
]["subscription_type_id"]
color = user_subscriptions_resources_by_address[from_address]["color"]
if to_address in subscriptions_addresses:
subscription_type_id = user_subscriptions_resources_by_address[to_address][
"subscription_type_id"
]
to_label = user_subscriptions_resources_by_address[to_address]["label"]
color = user_subscriptions_resources_by_address[to_address]["color"]
response.append(
data.EthereumTransactionItem(
color=color,
from_label=from_label,
to_label=to_label,
block_number=block_number,
gas=gas,
gasPrice=gas_price,
value=value,
from_address=from_address,
to_address=to_address,
hash=hash,
input=input,
nonce=nonce,
timestamp=timestamp,
subscription_type_id=subscription_type_id,
)
)
return data.EthereumTransactionResponse(stream=response, boundaries=boundaries)
def include_or_not_greater(value1, include, value2):
if include:
return value1 >= value2
else:
return value1 > value2
def include_or_not_lower(value1, include, value2):
if include:
return value1 <= value2
else:
return value1 < value2
def parse_search_query_to_sqlalchemy_filters(q: str, allowed_addresses: List[str]):
"""
Return list of sqlalchemy filters or empty list
"""
filters = q.split("+")
constructed_filters = []
for filter_item in filters:
if filter_item == "":
logger.warning("Skipping empty filter item")
continue
# Try Google style search filters
components = filter_item.split(":")
if len(components) == 2:
filter_type = components[0]
filter_value = components[1]
else:
continue
if filter_type == "to" and filter_value:
constructed_filters.append(EthereumTransaction.to_address == filter_value)
if filter_type == "from" and filter_value:
if filter_value not in allowed_addresses:
continue
constructed_filters.append(EthereumTransaction.from_address == filter_value)
if filter_type == "address" and filter_value:
constructed_filters.append(
or_(
EthereumTransaction.to_address == filter_value,
EthereumTransaction.from_address == filter_value,
)
)
return constructed_filters

Wyświetl plik

@ -10,6 +10,7 @@ from fastapi import FastAPI, HTTPException, Request, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb import db
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import user
from .. import data
@ -17,11 +18,14 @@ from ..middleware import BroodAuthMiddleware
from ..providers import ethereum_blockchain
from ..settings import (
DOCS_TARGET_PATH,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
ORIGINS,
DOCS_PATHS,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from .. import stream_queries
from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION
from ..version import MOONSTREAM_VERSION
@ -66,10 +70,10 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]:
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
# TODO(kompotkot, zomglings): PAGINATION!!!
# TODO(andrey, kompotkot, zomglings): PAGINATION!!!
user_subscriptions: Dict[str, List[Dict[str, Any]]] = {}
for subscription in response.resources:
subscription_type = subscription.get("subscription_type_id")
subscription_type = subscription.resource_data.get("subscription_type_id")
if subscription_type is None:
continue
if user_subscriptions.get(subscription_type) is None:
@ -79,56 +83,51 @@ def get_user_subscriptions(token: str) -> Dict[str, List[Dict[str, Any]]]:
return user_subscriptions
@app.get("/", tags=["streams"])
EVENT_PROVIDERS: Dict[str, Any] = {ethereum_blockchain.event_type: ethereum_blockchain}
@app.get("/", tags=["streams"], response_model=data.GetEventsResponse)
async def search_transactions(
request: Request,
q: str = Query(""),
start_time: Optional[int] = Query(0),
end_time: Optional[int] = Query(0),
include_start: Optional[bool] = Query(False),
include_end: Optional[bool] = Query(False),
start_time: int = Query(0),
end_time: Optional[int] = Query(None),
include_start: bool = Query(False),
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
):
# get user subscriptions
token = request.state.token
params = {"user_id": str(request.state.user.id)}
try:
# TODO(andrey, kompotkot): This query should filter resources of type "subscription". We may
# create other resources for users. When we do, I think this code will break.
# See how we apply this filter for "type": "subscription_type" in the /subscriptions route.
user_subscriptions_resources: BugoutResources = bc.list_resources(
token=token, params=params
)
except BugoutResponseException as e:
if e.detail == "Resources not found":
return data.EthereumTransactionResponse(stream=[])
raise HTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise HTTPException(status_code=500)
# TODO(andrey, kompotkot): Pagination over resources!!
# Issue: https://github.com/bugout-dev/brood/issues/14
address_to_subscriptions = {
resource.resource_data["address"]: resource.resource_data
for resource in user_subscriptions_resources.resources
}
boundaries = data.PageBoundary(
) -> data.GetEventsResponse:
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
next_event_time=0,
previous_event_time=0,
include_start=include_start,
include_end=include_end,
)
if address_to_subscriptions:
response = await ethereum_blockchain.get_transaction_in_blocks(
db_session=db_session,
query=q,
user_subscriptions_resources_by_address=address_to_subscriptions,
boundaries=boundaries,
user_subscriptions = get_user_subscriptions(request.state.token)
query = stream_queries.StreamQuery(
subscription_types=[subtype for subtype in EVENT_PROVIDERS], subscriptions=[]
)
if q.strip() != "":
query = stream_queries.parse_query_string(q)
results = {
event_type: provider.get_events(
db_session,
bc,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary,
query,
user_subscriptions,
)
return response
else:
return data.EthereumTransactionResponse(stream=[], boundaries=boundaries)
for event_type, provider in EVENT_PROVIDERS.items()
}
events = [
event
for _, event_list in results.values()
if event_list is not None
for event in event_list
]
events.sort(key=lambda event: event.event_timestamp, reverse=True)
response = data.GetEventsResponse(stream_boundary=stream_boundary, events=events)
return response

Wyświetl plik

@ -153,7 +153,9 @@ async def login_handler(
application_id=MOONSTREAM_APPLICATION_ID,
)
except BugoutResponseException as e:
raise HTTPException(status_code=e.status_code)
raise HTTPException(
status_code=e.status_code, detail=f"Error from Brood API: {e.detail}"
)
except Exception as e:
raise HTTPException(status_code=500)
return token