kopia lustrzana https://github.com/bugout-dev/moonstream
Merge branch 'main' into customized-dashboard
commit
fd8ee961f1
|
@ -7,7 +7,7 @@ User=ubuntu
|
|||
Group=www-data
|
||||
WorkingDirectory=/home/ubuntu/moonstream/backend
|
||||
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/uvicorn --host 0.0.0.0 --port 7481 --workers 8 moonstreamapi.api:app
|
||||
ExecStart=/home/ubuntu/moonstream-env/bin/uvicorn --proxy-headers --forwarded-allow-ips='127.0.0.1' --host 127.0.0.1 --port 7481 --workers 8 moonstreamapi.api:app
|
||||
SyslogIdentifier=moonstreamapi
|
||||
|
||||
[Install]
|
||||
|
|
|
@ -40,6 +40,8 @@ logger = logging.getLogger(__name__)
|
|||
blockchain_by_subscription_id = {
|
||||
"ethereum_blockchain": "ethereum",
|
||||
"polygon_blockchain": "polygon",
|
||||
"ethereum_smartcontract": "ethereum",
|
||||
"polygon_smartcontract": "polygon",
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -17,6 +17,26 @@ from ..settings import (
|
|||
from ..settings import bugout_client as bc
|
||||
|
||||
CANONICAL_SUBSCRIPTION_TYPES = {
|
||||
"ethereum_smartcontract": SubscriptionTypeResourceData(
|
||||
id="ethereum_smartcontract",
|
||||
name="Ethereum smartcontracts",
|
||||
choices=["input:address", "tag:erc721"],
|
||||
description="Contracts events and tx_calls of contract of Ethereum blockchain",
|
||||
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/ethereum/eth-diamond-purple.png",
|
||||
stripe_product_id=None,
|
||||
stripe_price_id=None,
|
||||
active=False,
|
||||
),
|
||||
"polygon_smartcontract": SubscriptionTypeResourceData(
|
||||
id="polygon_smartcontract",
|
||||
name="Polygon smartcontracts",
|
||||
choices=["input:address", "tag:erc721"],
|
||||
description="Contracts events and tx_calls of contract of Polygon blockchain",
|
||||
icon_url="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/matic-token-inverted-icon.png",
|
||||
stripe_product_id=None,
|
||||
stripe_price_id=None,
|
||||
active=True,
|
||||
),
|
||||
"ethereum_blockchain": SubscriptionTypeResourceData(
|
||||
id="ethereum_blockchain",
|
||||
name="Ethereum transactions",
|
||||
|
|
|
@ -32,9 +32,9 @@ from bugout.app import Bugout
|
|||
from bugout.data import BugoutResource
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from . import bugout, transactions, moonworm_provider
|
||||
from .. import data
|
||||
from ..stream_queries import StreamQuery
|
||||
from . import bugout, ethereum_blockchain
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.WARN)
|
||||
|
@ -47,8 +47,13 @@ class ReceivingEventsException(Exception):
|
|||
|
||||
|
||||
event_providers: Dict[str, Any] = {
|
||||
ethereum_blockchain.event_type: ethereum_blockchain,
|
||||
bugout.whalewatch_provider.event_type: bugout.whalewatch_provider,
|
||||
moonworm_provider.EthereumMoonwormProvider.event_type: moonworm_provider.EthereumMoonwormProvider,
|
||||
moonworm_provider.PolygonMoonwormProvider.event_type: moonworm_provider.PolygonMoonwormProvider,
|
||||
transactions.EthereumTransactions.event_type: transactions.EthereumTransactions,
|
||||
transactions.PolygonTransactions.event_type: transactions.PolygonTransactions,
|
||||
bugout.polygon_whalewatch_provider.event_type: bugout.polygon_whalewatch_provider,
|
||||
bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider,
|
||||
bugout.ethereum_whalewatch_provider.event_type: bugout.ethereum_whalewatch_provider,
|
||||
bugout.ethereum_txpool_provider.event_type: bugout.ethereum_txpool_provider,
|
||||
}
|
||||
|
||||
|
|
|
@ -336,7 +336,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour
|
|||
4. Amount (in WEI) received
|
||||
|
||||
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||
whalewatch_provider = BugoutEventProvider(
|
||||
ethereum_whalewatch_provider = BugoutEventProvider(
|
||||
event_type="ethereum_whalewatch",
|
||||
description=whalewatch_description,
|
||||
default_time_interval_seconds=310,
|
||||
|
@ -344,6 +344,14 @@ whalewatch_provider = BugoutEventProvider(
|
|||
tags=["crawl_type:ethereum_trending"],
|
||||
)
|
||||
|
||||
polygon_whalewatch_provider = BugoutEventProvider(
|
||||
event_type="polygon_whalewatch",
|
||||
description=whalewatch_description,
|
||||
default_time_interval_seconds=310,
|
||||
estimated_events_per_time_interval=1,
|
||||
tags=["crawl_type:polygon_trending"],
|
||||
)
|
||||
|
||||
ethereum_txpool_description = """Event provider for Ethereum transaction pool.
|
||||
|
||||
Shows the latest events (from the previous hour) in the Ethereum transaction pool.
|
||||
|
|
|
@ -1,433 +0,0 @@
|
|||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple, cast
|
||||
|
||||
from bugout.app import Bugout
|
||||
from bugout.data import BugoutResource
|
||||
from moonstreamdb.models import EthereumBlock, EthereumLabel, EthereumTransaction
|
||||
from sqlalchemy import and_, or_, text
|
||||
from sqlalchemy.orm import Query, Session
|
||||
from sqlalchemy.sql.functions import user
|
||||
|
||||
from .. import data
|
||||
from ..stream_boundaries import validate_stream_boundary
|
||||
from ..stream_queries import StreamQuery
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.WARN)
|
||||
|
||||
|
||||
event_type = "ethereum_blockchain"
|
||||
allowed_tags = ["tag:erc721"]
|
||||
|
||||
description = f"""Event provider for transactions from the Ethereum blockchain.
|
||||
|
||||
To restrict your queries to this provider, add a filter of \"type:{event_type}\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||
|
||||
default_time_interval_seconds: int = 5 * 60
|
||||
|
||||
# 200 transactions per block, 4 blocks per minute.
|
||||
estimated_events_per_time_interval: float = 5 * 800
|
||||
|
||||
|
||||
def validate_subscription(
|
||||
subscription_resource_data: data.SubscriptionResourceData,
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Checks that the subscription represents a valid subscription to an Ethereum address.
|
||||
|
||||
NOTE: Currently, this function only checks that the address is a nonempty string.
|
||||
"""
|
||||
errors: List[str] = []
|
||||
if subscription_resource_data.address == "":
|
||||
errors.append("address is empty")
|
||||
|
||||
if subscription_resource_data.subscription_type_id != event_type:
|
||||
errors.append(
|
||||
f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}."
|
||||
)
|
||||
|
||||
if errors:
|
||||
return False, errors
|
||||
return True, errors
|
||||
|
||||
|
||||
def stream_boundary_validator(
|
||||
stream_boundary: data.StreamBoundary,
|
||||
) -> data.StreamBoundary:
|
||||
"""
|
||||
Stream boundary validator for the ethereum_blockchain event provider.
|
||||
|
||||
Checks that stream boundaries do not exceed periods of greater than 2 hours.
|
||||
|
||||
Raises an error for invalid stream boundaries, else returns None.
|
||||
"""
|
||||
valid_period_seconds = 2 * 60 * 60
|
||||
_, stream_boundary = validate_stream_boundary(
|
||||
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
||||
)
|
||||
return stream_boundary
|
||||
|
||||
|
||||
@dataclass
|
||||
class Filters:
|
||||
"""
|
||||
ethereum_blockchain event filters act as a disjunction over queries specifying a from address
|
||||
or a to address.
|
||||
"""
|
||||
|
||||
from_addresses: List[str] = field(default_factory=list)
|
||||
to_addresses: List[str] = field(default_factory=list)
|
||||
labels: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
def default_filters(subscriptions: List[BugoutResource]) -> Filters:
|
||||
"""
|
||||
Default filter strings for the given list of subscriptions.
|
||||
"""
|
||||
filters = Filters()
|
||||
for subscription in subscriptions:
|
||||
subscription_address = cast(
|
||||
Optional[str], subscription.resource_data.get("address")
|
||||
)
|
||||
if subscription_address is not None:
|
||||
if subscription_address in allowed_tags:
|
||||
filters.labels.append(subscription_address.split(":")[1])
|
||||
else:
|
||||
filters.from_addresses.append(subscription_address)
|
||||
filters.to_addresses.append(subscription_address)
|
||||
else:
|
||||
logger.warn(
|
||||
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||
)
|
||||
return filters
|
||||
|
||||
|
||||
def parse_filters(
|
||||
query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
|
||||
) -> Optional[Filters]:
|
||||
"""
|
||||
Passes raw filter strings into a Filters object which is used to construct a database query
|
||||
for ethereum transactions.
|
||||
|
||||
Filter syntax is:
|
||||
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
|
||||
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
|
||||
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
|
||||
|
||||
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
||||
None indicating that the StreamQuery does not require any data from this provider.
|
||||
"""
|
||||
if query.subscription_types and not any(
|
||||
subtype == event_type for subtype in query.subscription_types
|
||||
):
|
||||
return None
|
||||
|
||||
provider_subscriptions = user_subscriptions.get(event_type)
|
||||
|
||||
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||
if not provider_subscriptions:
|
||||
return None
|
||||
parsed_filters = default_filters(provider_subscriptions)
|
||||
|
||||
from_prefix_length = len("from:")
|
||||
to_prefix_length = len("to:")
|
||||
|
||||
subscribed_addresses = {
|
||||
subscription.resource_data.get("address")
|
||||
for subscription in provider_subscriptions
|
||||
if subscription.resource_data.get("address") is not None
|
||||
}
|
||||
|
||||
if query.subscriptions:
|
||||
parsed_filters.from_addresses = []
|
||||
parsed_filters.to_addresses = []
|
||||
for provider_type, raw_filter in query.subscriptions:
|
||||
if provider_type != event_type:
|
||||
continue
|
||||
|
||||
if raw_filter.startswith("from:"):
|
||||
address = raw_filter[from_prefix_length:]
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.from_addresses.append(address)
|
||||
elif raw_filter.startswith("to:"):
|
||||
address = raw_filter[to_prefix_length:]
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.to_addresses.append(address)
|
||||
else:
|
||||
address = raw_filter
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.from_addresses.append(address)
|
||||
parsed_filters.to_addresses.append(address)
|
||||
|
||||
if not (
|
||||
parsed_filters.from_addresses
|
||||
or parsed_filters.to_addresses
|
||||
or parsed_filters.labels
|
||||
):
|
||||
return None
|
||||
|
||||
return parsed_filters
|
||||
|
||||
|
||||
def query_ethereum_transactions(
|
||||
db_session: Session,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
parsed_filters: Filters,
|
||||
) -> Query:
|
||||
"""
|
||||
Builds a database query for Ethereum transactions that occurred within the window of time that
|
||||
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
||||
"""
|
||||
query = db_session.query(
|
||||
EthereumTransaction.hash,
|
||||
EthereumTransaction.block_number,
|
||||
EthereumTransaction.from_address,
|
||||
EthereumTransaction.to_address,
|
||||
EthereumTransaction.gas,
|
||||
EthereumTransaction.gas_price,
|
||||
EthereumTransaction.input,
|
||||
EthereumTransaction.nonce,
|
||||
EthereumTransaction.value,
|
||||
EthereumBlock.timestamp.label("timestamp"),
|
||||
).join(
|
||||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
|
||||
if stream_boundary.include_start:
|
||||
query = query.filter(EthereumBlock.timestamp >= stream_boundary.start_time)
|
||||
else:
|
||||
query = query.filter(EthereumBlock.timestamp > stream_boundary.start_time)
|
||||
|
||||
if stream_boundary.end_time is not None:
|
||||
if stream_boundary.include_end:
|
||||
query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time)
|
||||
else:
|
||||
query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time)
|
||||
|
||||
# We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address
|
||||
address_clauses = []
|
||||
|
||||
address_clauses.extend(
|
||||
[
|
||||
EthereumTransaction.from_address == address
|
||||
for address in parsed_filters.from_addresses
|
||||
]
|
||||
+ [
|
||||
EthereumTransaction.to_address == address
|
||||
for address in parsed_filters.to_addresses
|
||||
]
|
||||
)
|
||||
|
||||
labels_clause = []
|
||||
|
||||
if parsed_filters.labels:
|
||||
label_clause = (
|
||||
db_session.query(EthereumLabel)
|
||||
.filter(
|
||||
or_(
|
||||
*[
|
||||
EthereumLabel.label.contains(label)
|
||||
for label in list(set(parsed_filters.labels))
|
||||
]
|
||||
)
|
||||
)
|
||||
.exists()
|
||||
)
|
||||
labels_clause.append(label_clause)
|
||||
|
||||
subscriptions_clause = address_clauses + labels_clause
|
||||
|
||||
if subscriptions_clause:
|
||||
query = query.filter(or_(*subscriptions_clause))
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def ethereum_transaction_event(row: Tuple) -> data.Event:
|
||||
"""
|
||||
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
||||
into an Event object.
|
||||
"""
|
||||
(
|
||||
hash,
|
||||
block_number,
|
||||
from_address,
|
||||
to_address,
|
||||
gas,
|
||||
gas_price,
|
||||
input,
|
||||
nonce,
|
||||
value,
|
||||
timestamp,
|
||||
) = row
|
||||
return data.Event(
|
||||
event_type=event_type,
|
||||
event_timestamp=timestamp,
|
||||
event_data={
|
||||
"hash": hash,
|
||||
"block_number": block_number,
|
||||
"from": from_address,
|
||||
"to": to_address,
|
||||
"gas": gas,
|
||||
"gas_price": gas_price,
|
||||
"input": input,
|
||||
"nonce": nonce,
|
||||
"value": value,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def get_events(
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
||||
"""
|
||||
Returns ethereum_blockchain events for the given addresses in the time period represented
|
||||
by stream_boundary.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
stream_boundary = stream_boundary_validator(stream_boundary)
|
||||
|
||||
parsed_filters = parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
ethereum_transactions = query_ethereum_transactions(
|
||||
db_session, stream_boundary, parsed_filters
|
||||
)
|
||||
|
||||
ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc"))
|
||||
|
||||
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
||||
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
||||
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
||||
# do anything to help them get data from teh backend.
|
||||
# The error message on the API side when the statement times out:
|
||||
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
||||
events: List[data.Event] = [
|
||||
ethereum_transaction_event(row) for row in ethereum_transactions
|
||||
]
|
||||
|
||||
if (stream_boundary.end_time is None) and events:
|
||||
stream_boundary.end_time = events[0].event_timestamp
|
||||
stream_boundary.include_end = True
|
||||
|
||||
return stream_boundary, events
|
||||
|
||||
|
||||
def latest_events(
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
query: StreamQuery,
|
||||
num_events: int,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[List[data.Event]]:
|
||||
"""
|
||||
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
||||
by the given filters.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
||||
|
||||
stream_boundary = data.StreamBoundary(
|
||||
start_time=0, include_start=True, end_time=None, include_end=False
|
||||
)
|
||||
parsed_filters = parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
ethereum_transactions = (
|
||||
query_ethereum_transactions(db_session, stream_boundary, parsed_filters)
|
||||
.order_by(text("timestamp desc"))
|
||||
.limit(num_events)
|
||||
)
|
||||
|
||||
return [ethereum_transaction_event(row) for row in ethereum_transactions]
|
||||
|
||||
|
||||
def next_event(
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.end_time is not None
|
||||
), "Cannot return next event for up-to-date stream boundary"
|
||||
next_stream_boundary = data.StreamBoundary(
|
||||
start_time=stream_boundary.end_time,
|
||||
include_start=(not stream_boundary.include_end),
|
||||
end_time=None,
|
||||
include_end=False,
|
||||
)
|
||||
parsed_filters = parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
maybe_ethereum_transaction = (
|
||||
query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters)
|
||||
.order_by(text("timestamp asc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return ethereum_transaction_event(maybe_ethereum_transaction)
|
||||
|
||||
|
||||
def previous_event(
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the latest event occuring before the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.start_time != 0
|
||||
), "Cannot return previous event for stream starting at time 0"
|
||||
previous_stream_boundary = data.StreamBoundary(
|
||||
start_time=0,
|
||||
include_start=True,
|
||||
end_time=stream_boundary.start_time,
|
||||
include_end=(not stream_boundary.include_start),
|
||||
)
|
||||
parsed_filters = parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
maybe_ethereum_transaction = (
|
||||
query_ethereum_transactions(
|
||||
db_session, previous_stream_boundary, parsed_filters
|
||||
)
|
||||
.order_by(text("timestamp desc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return ethereum_transaction_event(maybe_ethereum_transaction)
|
|
@ -0,0 +1,410 @@
|
|||
from dataclasses import dataclass, field
|
||||
import logging
|
||||
from typing import cast, Dict, Any, List, Optional, Tuple
|
||||
|
||||
from bugout.app import Bugout
|
||||
from bugout.data import BugoutResource
|
||||
|
||||
from moonstreamdb.blockchain import (
|
||||
get_label_model,
|
||||
AvailableBlockchainType,
|
||||
)
|
||||
from sqlalchemy import or_, and_, text
|
||||
from sqlalchemy.orm import Session, Query, query_expression
|
||||
from sqlalchemy.sql.expression import label
|
||||
|
||||
|
||||
from .. import data
|
||||
from ..stream_boundaries import validate_stream_boundary
|
||||
from ..stream_queries import StreamQuery
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.WARN)
|
||||
|
||||
|
||||
ethereum_event_type = "ethereum_blockchain"
|
||||
polygon_event_type = "polygon_blockchain"
|
||||
allowed_tags = ["tag:erc721"]
|
||||
|
||||
description = f"""Event provider for transactions from the Ethereum blockchain.
|
||||
|
||||
To restrict your queries to this provider, add a filter of \"type:{ethereum_event_type}\{polygon_event_type}\" to your query (query parameter: \"q\") on the /streams endpoint."""
|
||||
|
||||
default_time_interval_seconds: int = 5 * 60
|
||||
|
||||
# 200 transactions per block, 4 blocks per minute.
|
||||
estimated_events_per_time_interval: float = 5 * 800
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArgsFilters:
|
||||
name: str
|
||||
value: Any
|
||||
type: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class LabelsFilters:
|
||||
|
||||
name: str
|
||||
type: str
|
||||
args: List[ArgsFilters] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AddressFilters:
|
||||
|
||||
address: str
|
||||
label_filters: List[LabelsFilters] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Filters:
|
||||
|
||||
addresses: List[AddressFilters] = field(default_factory=list)
|
||||
|
||||
|
||||
class MoonwormProvider:
|
||||
def __init__(
|
||||
self,
|
||||
event_type: str,
|
||||
blockchain: AvailableBlockchainType,
|
||||
description: str,
|
||||
streamboaundary_range_limit: int,
|
||||
):
|
||||
self.event_type = event_type
|
||||
self.blockchain = blockchain
|
||||
self.description = description
|
||||
self.valid_period_seconds = streamboaundary_range_limit
|
||||
|
||||
def default_filters(self, subscriptions: List[BugoutResource]) -> Filters:
|
||||
"""
|
||||
Default filter strings for the given list of subscriptions.
|
||||
"""
|
||||
filters = Filters()
|
||||
for subscription in subscriptions:
|
||||
subscription_address = cast(
|
||||
Optional[str], subscription.resource_data.get("address")
|
||||
)
|
||||
if subscription_address is not None:
|
||||
|
||||
# How apply labels?
|
||||
filters.addresses.append(
|
||||
AddressFilters(address=subscription_address, label_filters=[])
|
||||
)
|
||||
else:
|
||||
logger.warn(
|
||||
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||
)
|
||||
return filters
|
||||
|
||||
def apply_query_filters(self, filters: Filters, query_filters: StreamQuery):
|
||||
"""
|
||||
Required to implement filters wich depends on procider
|
||||
"""
|
||||
pass
|
||||
|
||||
def events(self, row: Tuple) -> data.Event:
|
||||
"""
|
||||
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
||||
into an Event object.
|
||||
"""
|
||||
(
|
||||
block_number,
|
||||
address,
|
||||
transaction_hash,
|
||||
label_data,
|
||||
block_timestamp,
|
||||
log_index,
|
||||
created_at,
|
||||
) = row
|
||||
return data.Event(
|
||||
event_type=self.event_type,
|
||||
event_timestamp=block_timestamp,
|
||||
event_data={
|
||||
"hash": transaction_hash,
|
||||
"block_number": block_number,
|
||||
"address": address,
|
||||
"label_data": label_data,
|
||||
"log_index": log_index,
|
||||
"created_at": created_at,
|
||||
},
|
||||
)
|
||||
|
||||
def parse_filters(
|
||||
self,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[Filters]:
|
||||
"""
|
||||
Passes raw filter strings into a Filters object which is used to construct a database query
|
||||
for ethereum transactions.
|
||||
Right now support only addresses query.
|
||||
"""
|
||||
if query.subscription_types and not any(
|
||||
subtype == self.event_type for subtype in query.subscription_types
|
||||
):
|
||||
return None
|
||||
|
||||
provider_subscriptions = user_subscriptions.get(self.event_type)
|
||||
|
||||
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||
if not provider_subscriptions:
|
||||
return None
|
||||
parsed_filters = self.default_filters(provider_subscriptions)
|
||||
|
||||
self.apply_query_filters(parsed_filters, query)
|
||||
|
||||
if not (parsed_filters.addresses):
|
||||
return None
|
||||
|
||||
return parsed_filters
|
||||
|
||||
def stream_boundary_validator(
|
||||
self, stream_boundary: data.StreamBoundary
|
||||
) -> data.StreamBoundary:
|
||||
"""
|
||||
Stream boundary validator for the events provider.
|
||||
|
||||
Checks that stream boundaries do not exceed periods of greater than 24 hours.
|
||||
|
||||
Raises an error for invalid stream boundaries, else returns None.
|
||||
"""
|
||||
valid_period_seconds = self.valid_period_seconds
|
||||
|
||||
_, stream_boundary = validate_stream_boundary(
|
||||
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
||||
)
|
||||
return stream_boundary
|
||||
|
||||
def generate_events_query(
|
||||
self,
|
||||
db_session: Session,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
parsed_filters: Filters,
|
||||
) -> Query:
|
||||
"""
|
||||
Builds a database query for Ethereum transactions that occurred within the window of time that
|
||||
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
||||
"""
|
||||
|
||||
Labels = get_label_model(self.blockchain)
|
||||
|
||||
query = db_session.query(
|
||||
Labels.block_number,
|
||||
Labels.address,
|
||||
Labels.transaction_hash,
|
||||
Labels.label_data,
|
||||
Labels.block_timestamp,
|
||||
Labels.log_index,
|
||||
Labels.created_at,
|
||||
).filter(Labels.label == "moonworm")
|
||||
|
||||
if stream_boundary.include_start:
|
||||
query = query.filter(Labels.block_timestamp >= stream_boundary.start_time)
|
||||
else:
|
||||
query = query.filter(Labels.block_timestamp > stream_boundary.start_time)
|
||||
|
||||
if stream_boundary.end_time is not None:
|
||||
if stream_boundary.include_end:
|
||||
query = query.filter(Labels.block_timestamp <= stream_boundary.end_time)
|
||||
else:
|
||||
query = query.filter(Labels.block_timestamp <= stream_boundary.end_time)
|
||||
|
||||
addresses_filters = []
|
||||
|
||||
for address_filter in parsed_filters.addresses:
|
||||
labels_filters = []
|
||||
for label_filter in address_filter.label_filters:
|
||||
|
||||
labels_filters.append(
|
||||
and_(
|
||||
*(
|
||||
Labels.label_data["type"] == label_filter.type,
|
||||
Labels.label_data["name"] == label_filter.name,
|
||||
)
|
||||
)
|
||||
)
|
||||
addresses_filters.append(
|
||||
and_(
|
||||
*(
|
||||
Labels.address == address_filter.address,
|
||||
or_(*labels_filters),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
query = query.filter(or_(*addresses_filters))
|
||||
|
||||
return query
|
||||
|
||||
def get_events(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
||||
"""
|
||||
Returns blockchain events for the given addresses in the time period represented
|
||||
by stream_boundary.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
stream_boundary = self.stream_boundary_validator(stream_boundary)
|
||||
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
ethereum_transactions = self.generate_events_query(
|
||||
db_session, stream_boundary, parsed_filters
|
||||
)
|
||||
|
||||
ethereum_transactions = ethereum_transactions.order_by(
|
||||
text("block_timestamp desc")
|
||||
)
|
||||
|
||||
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
||||
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
||||
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
||||
# do anything to help them get data from teh backend.
|
||||
# The error message on the API side when the statement times out:
|
||||
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
||||
events: List[data.Event] = [self.events(row) for row in ethereum_transactions]
|
||||
|
||||
if (stream_boundary.end_time is None) and events:
|
||||
stream_boundary.end_time = events[0].event_timestamp
|
||||
stream_boundary.include_end = True
|
||||
|
||||
return stream_boundary, events
|
||||
|
||||
def latest_events(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
query: StreamQuery,
|
||||
num_events: int,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[List[data.Event]]:
|
||||
"""
|
||||
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
||||
by the given filters.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
||||
|
||||
stream_boundary = data.StreamBoundary(
|
||||
start_time=0, include_start=True, end_time=None, include_end=False
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
ethereum_transactions = (
|
||||
self.generate_events_query(db_session, stream_boundary, parsed_filters)
|
||||
.order_by(text("block_timestamp desc"))
|
||||
.limit(num_events)
|
||||
)
|
||||
|
||||
return [self.events(row) for row in ethereum_transactions]
|
||||
|
||||
def next_event(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.end_time is not None
|
||||
), "Cannot return next event for up-to-date stream boundary"
|
||||
next_stream_boundary = data.StreamBoundary(
|
||||
start_time=stream_boundary.end_time,
|
||||
include_start=(not stream_boundary.include_end),
|
||||
end_time=None,
|
||||
include_end=False,
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
maybe_ethereum_transaction = (
|
||||
self.generate_events_query(db_session, next_stream_boundary, parsed_filters)
|
||||
.order_by(text("block_timestamp asc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return self.events(maybe_ethereum_transaction)
|
||||
|
||||
def previous_event(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the latest event occuring before the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.start_time != 0
|
||||
), "Cannot return previous event for stream starting at time 0"
|
||||
previous_stream_boundary = data.StreamBoundary(
|
||||
start_time=0,
|
||||
include_start=True,
|
||||
end_time=stream_boundary.start_time,
|
||||
include_end=(not stream_boundary.include_start),
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
maybe_ethereum_transaction = (
|
||||
self.generate_events_query(
|
||||
db_session, previous_stream_boundary, parsed_filters
|
||||
)
|
||||
.order_by(text("block_timestamp desc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return self.events(maybe_ethereum_transaction)
|
||||
|
||||
|
||||
EthereumMoonwormProvider = MoonwormProvider(
|
||||
event_type="ethereum_smartcontract",
|
||||
blockchain=AvailableBlockchainType("ethereum"),
|
||||
description="Provider for resiving transactions from Ethereum tables.",
|
||||
streamboaundary_range_limit=2 * 60 * 60,
|
||||
)
|
||||
|
||||
PolygonMoonwormProvider = MoonwormProvider(
|
||||
event_type="polygon_smartcontract",
|
||||
blockchain=AvailableBlockchainType("polygon"),
|
||||
description="Provider for resiving transactions from Polygon tables.",
|
||||
streamboaundary_range_limit=2 * 60 * 60,
|
||||
)
|
|
@ -0,0 +1,466 @@
|
|||
from dataclasses import dataclass, field
|
||||
import logging
|
||||
from typing import cast, Dict, Any, List, Optional, Tuple
|
||||
|
||||
from bugout.app import Bugout
|
||||
from bugout.data import BugoutResource
|
||||
|
||||
from moonstreamdb.blockchain import (
|
||||
get_label_model,
|
||||
get_block_model,
|
||||
get_transaction_model,
|
||||
AvailableBlockchainType,
|
||||
)
|
||||
from sqlalchemy import or_, and_, text
|
||||
from sqlalchemy.orm import Session, Query
|
||||
|
||||
|
||||
from .. import data
|
||||
from ..stream_boundaries import validate_stream_boundary
|
||||
from ..stream_queries import StreamQuery
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.WARN)
|
||||
|
||||
|
||||
allowed_tags = ["tag:erc721"]
|
||||
|
||||
default_time_interval_seconds: int = 5 * 60
|
||||
|
||||
# 200 transactions per block, 4 blocks per minute.
|
||||
estimated_events_per_time_interval: float = 5 * 800
|
||||
|
||||
|
||||
@dataclass
|
||||
class Filters:
|
||||
"""
|
||||
ethereum_blockchain event filters act as a disjunction over queries specifying a from address
|
||||
or a to address.
|
||||
"""
|
||||
|
||||
from_addresses: List[str] = field(default_factory=list)
|
||||
to_addresses: List[str] = field(default_factory=list)
|
||||
labels: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class TransactionsProvider:
|
||||
def __init__(
|
||||
self,
|
||||
event_type: str,
|
||||
blockchain: AvailableBlockchainType,
|
||||
description: str,
|
||||
streamboaundary_range_limit: int,
|
||||
):
|
||||
self.event_type = event_type
|
||||
self.blockchain = blockchain
|
||||
self.description = description
|
||||
self.valid_period_seconds = streamboaundary_range_limit
|
||||
|
||||
def validate_subscription(
|
||||
self, subscription_resource_data: data.SubscriptionResourceData, event_type
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Checks that the subscription represents a valid subscription to an Ethereum address.
|
||||
|
||||
NOTE: Currently, this function only checks that the address is a nonempty string.
|
||||
"""
|
||||
errors: List[str] = []
|
||||
if subscription_resource_data.address == "":
|
||||
errors.append("address is empty")
|
||||
|
||||
if subscription_resource_data.subscription_type_id != event_type:
|
||||
errors.append(
|
||||
f"Invalid subscription_type ({subscription_resource_data.subscription_type_id}). Expected: {event_type}."
|
||||
)
|
||||
|
||||
if errors:
|
||||
return False, errors
|
||||
return True, errors
|
||||
|
||||
def stream_boundary_validator(
|
||||
self, stream_boundary: data.StreamBoundary
|
||||
) -> data.StreamBoundary:
|
||||
"""
|
||||
Stream boundary validator for the transactions provider.
|
||||
|
||||
Checks that stream boundaries do not exceed periods of greater than 2 hours.
|
||||
|
||||
Raises an error for invalid stream boundaries, else returns None.
|
||||
"""
|
||||
valid_period_seconds = self.valid_period_seconds
|
||||
_, stream_boundary = validate_stream_boundary(
|
||||
stream_boundary, valid_period_seconds, raise_when_invalid=True
|
||||
)
|
||||
return stream_boundary
|
||||
|
||||
def default_filters(self, subscriptions: List[BugoutResource]) -> Filters:
|
||||
"""
|
||||
Default filter strings for the given list of subscriptions.
|
||||
"""
|
||||
filters = Filters()
|
||||
for subscription in subscriptions:
|
||||
subscription_address = cast(
|
||||
Optional[str], subscription.resource_data.get("address")
|
||||
)
|
||||
if subscription_address is not None:
|
||||
if subscription_address in allowed_tags:
|
||||
filters.labels.append(subscription_address.split(":")[1])
|
||||
else:
|
||||
filters.from_addresses.append(subscription_address)
|
||||
filters.to_addresses.append(subscription_address)
|
||||
else:
|
||||
logger.warn(
|
||||
f"Could not find subscription address for subscription with resource id: {subscription.id}"
|
||||
)
|
||||
return filters
|
||||
|
||||
def parse_filters(
|
||||
self,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[Filters]:
|
||||
"""
|
||||
Passes raw filter strings into a Filters object which is used to construct a database query
|
||||
for ethereum transactions.
|
||||
|
||||
Filter syntax is:
|
||||
- "from:<address>" - specifies that we want to include all transactions with "<address>" as a source
|
||||
- "to:<address>" - specifies that we want to include all transactions with "<address>" as a destination
|
||||
- "<address>" - specifies that we want to include all transactions with "<address>" as a source AND all transactions with "<address>" as a destination
|
||||
|
||||
If the given StreamQuery induces filters on this provider, returns those filters. Otherwise, returns
|
||||
None indicating that the StreamQuery does not require any data from this provider.
|
||||
"""
|
||||
if query.subscription_types and not any(
|
||||
subtype == self.event_type for subtype in query.subscription_types
|
||||
):
|
||||
return None
|
||||
|
||||
provider_subscriptions = user_subscriptions.get(self.event_type)
|
||||
|
||||
# If the user has no subscriptions to this event type, we do not have to return any data!
|
||||
if not provider_subscriptions:
|
||||
return None
|
||||
parsed_filters = self.default_filters(provider_subscriptions)
|
||||
|
||||
from_prefix_length = len("from:")
|
||||
to_prefix_length = len("to:")
|
||||
|
||||
subscribed_addresses = {
|
||||
subscription.resource_data.get("address")
|
||||
for subscription in provider_subscriptions
|
||||
if subscription.resource_data.get("address") is not None
|
||||
}
|
||||
|
||||
if query.subscriptions:
|
||||
parsed_filters.from_addresses = []
|
||||
parsed_filters.to_addresses = []
|
||||
for provider_type, raw_filter in query.subscriptions:
|
||||
if provider_type != self.event_type:
|
||||
continue
|
||||
|
||||
if raw_filter.startswith("from:"):
|
||||
address = raw_filter[from_prefix_length:]
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.from_addresses.append(address)
|
||||
elif raw_filter.startswith("to:"):
|
||||
address = raw_filter[to_prefix_length:]
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.to_addresses.append(address)
|
||||
else:
|
||||
address = raw_filter
|
||||
if address in subscribed_addresses:
|
||||
parsed_filters.from_addresses.append(address)
|
||||
parsed_filters.to_addresses.append(address)
|
||||
|
||||
if not (
|
||||
parsed_filters.from_addresses
|
||||
or parsed_filters.to_addresses
|
||||
or parsed_filters.labels
|
||||
):
|
||||
return None
|
||||
|
||||
return parsed_filters
|
||||
|
||||
def query_transactions(
|
||||
self,
|
||||
db_session: Session,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
parsed_filters: Filters,
|
||||
) -> Query:
|
||||
"""
|
||||
Builds a database query for Ethereum transactions that occurred within the window of time that
|
||||
the given stream_boundary represents and satisfying the constraints of parsed_filters.
|
||||
"""
|
||||
|
||||
Transactions = get_transaction_model(self.blockchain)
|
||||
Blocks = get_block_model(self.blockchain)
|
||||
Labels = get_label_model(self.blockchain)
|
||||
|
||||
query = db_session.query(
|
||||
Transactions.hash,
|
||||
Transactions.block_number,
|
||||
Transactions.from_address,
|
||||
Transactions.to_address,
|
||||
Transactions.gas,
|
||||
Transactions.gas_price,
|
||||
Transactions.input,
|
||||
Transactions.nonce,
|
||||
Transactions.value,
|
||||
Blocks.timestamp.label("timestamp"),
|
||||
).join(
|
||||
Blocks,
|
||||
Transactions.block_number == Blocks.block_number,
|
||||
)
|
||||
|
||||
if stream_boundary.include_start:
|
||||
query = query.filter(Blocks.timestamp >= stream_boundary.start_time)
|
||||
else:
|
||||
query = query.filter(Blocks.timestamp > stream_boundary.start_time)
|
||||
|
||||
if stream_boundary.end_time is not None:
|
||||
if stream_boundary.include_end:
|
||||
query = query.filter(Blocks.timestamp <= stream_boundary.end_time)
|
||||
else:
|
||||
query = query.filter(Blocks.timestamp <= stream_boundary.end_time)
|
||||
|
||||
# We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address
|
||||
address_clauses = []
|
||||
|
||||
address_clauses.extend(
|
||||
[
|
||||
Transactions.from_address == address
|
||||
for address in parsed_filters.from_addresses
|
||||
]
|
||||
+ [
|
||||
Transactions.to_address == address
|
||||
for address in parsed_filters.to_addresses
|
||||
]
|
||||
)
|
||||
|
||||
labels_clause = []
|
||||
|
||||
if parsed_filters.labels:
|
||||
label_clause = (
|
||||
db_session.query(Labels)
|
||||
.filter(
|
||||
or_(
|
||||
*[
|
||||
Labels.label.contains(label)
|
||||
for label in list(set(parsed_filters.labels))
|
||||
]
|
||||
)
|
||||
)
|
||||
.exists()
|
||||
)
|
||||
labels_clause.append(label_clause)
|
||||
|
||||
subscriptions_clause = address_clauses + labels_clause
|
||||
|
||||
if subscriptions_clause:
|
||||
query = query.filter(or_(*subscriptions_clause))
|
||||
|
||||
return query
|
||||
|
||||
def ethereum_transaction_event(self, row: Tuple) -> data.Event:
|
||||
"""
|
||||
Parses a result from the result set of a database query for Ethereum transactions with block timestamp
|
||||
into an Event object.
|
||||
"""
|
||||
(
|
||||
hash,
|
||||
block_number,
|
||||
from_address,
|
||||
to_address,
|
||||
gas,
|
||||
gas_price,
|
||||
input,
|
||||
nonce,
|
||||
value,
|
||||
timestamp,
|
||||
) = row
|
||||
return data.Event(
|
||||
event_type=self.event_type,
|
||||
event_timestamp=timestamp,
|
||||
event_data={
|
||||
"hash": hash,
|
||||
"block_number": block_number,
|
||||
"from": from_address,
|
||||
"to": to_address,
|
||||
"gas": gas,
|
||||
"gas_price": gas_price,
|
||||
"input": input,
|
||||
"nonce": nonce,
|
||||
"value": value,
|
||||
},
|
||||
)
|
||||
|
||||
def get_events(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[Tuple[data.StreamBoundary, List[data.Event]]]:
|
||||
"""
|
||||
Returns ethereum_blockchain events for the given addresses in the time period represented
|
||||
by stream_boundary.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
stream_boundary = self.stream_boundary_validator(stream_boundary)
|
||||
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
ethereum_transactions = self.query_transactions(
|
||||
db_session, stream_boundary, parsed_filters
|
||||
)
|
||||
|
||||
ethereum_transactions = ethereum_transactions.order_by(text("timestamp desc"))
|
||||
|
||||
# TODO(zomglings): Catch the operational error denoting that the statement timed out here
|
||||
# and wrap it in an error that tells the API to return the appropriate 400 response. Currently,
|
||||
# when the statement times out, the API returns a 500 status code to the client, which doesn't
|
||||
# do anything to help them get data from teh backend.
|
||||
# The error message on the API side when the statement times out:
|
||||
# > sqlalchemy.exc.OperationalError: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
|
||||
events: List[data.Event] = [
|
||||
self.ethereum_transaction_event(row) for row in ethereum_transactions
|
||||
]
|
||||
|
||||
if (stream_boundary.end_time is None) and events:
|
||||
stream_boundary.end_time = events[0].event_timestamp
|
||||
stream_boundary.include_end = True
|
||||
|
||||
return stream_boundary, events
|
||||
|
||||
def latest_events(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
query: StreamQuery,
|
||||
num_events: int,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[List[data.Event]]:
|
||||
"""
|
||||
Returns the num_events latest events from the current provider, subject to the constraints imposed
|
||||
by the given filters.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert num_events > 0, f"num_events ({num_events}) should be positive."
|
||||
|
||||
stream_boundary = data.StreamBoundary(
|
||||
start_time=0, include_start=True, end_time=None, include_end=False
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
ethereum_transactions = (
|
||||
self.query_transactions(db_session, stream_boundary, parsed_filters)
|
||||
.order_by(text("timestamp desc"))
|
||||
.limit(num_events)
|
||||
)
|
||||
|
||||
return [self.ethereum_transaction_event(row) for row in ethereum_transactions]
|
||||
|
||||
def next_event(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the earliest event occuring after the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.end_time is not None
|
||||
), "Cannot return next event for up-to-date stream boundary"
|
||||
next_stream_boundary = data.StreamBoundary(
|
||||
start_time=stream_boundary.end_time,
|
||||
include_start=(not stream_boundary.include_end),
|
||||
end_time=None,
|
||||
include_end=False,
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
|
||||
maybe_ethereum_transaction = (
|
||||
self.query_transactions(db_session, next_stream_boundary, parsed_filters)
|
||||
.order_by(text("timestamp asc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return self.ethereum_transaction_event(maybe_ethereum_transaction)
|
||||
|
||||
def previous_event(
|
||||
self,
|
||||
db_session: Session,
|
||||
bugout_client: Bugout,
|
||||
data_journal_id: str,
|
||||
data_access_token: str,
|
||||
stream_boundary: data.StreamBoundary,
|
||||
query: StreamQuery,
|
||||
user_subscriptions: Dict[str, List[BugoutResource]],
|
||||
) -> Optional[data.Event]:
|
||||
"""
|
||||
Returns the latest event occuring before the given stream boundary corresponding to the given
|
||||
query from this provider.
|
||||
|
||||
If the query does not require any data from this provider, returns None.
|
||||
"""
|
||||
assert (
|
||||
stream_boundary.start_time != 0
|
||||
), "Cannot return previous event for stream starting at time 0"
|
||||
previous_stream_boundary = data.StreamBoundary(
|
||||
start_time=0,
|
||||
include_start=True,
|
||||
end_time=stream_boundary.start_time,
|
||||
include_end=(not stream_boundary.include_start),
|
||||
)
|
||||
parsed_filters = self.parse_filters(query, user_subscriptions)
|
||||
if parsed_filters is None:
|
||||
return None
|
||||
maybe_ethereum_transaction = (
|
||||
self.query_transactions(
|
||||
db_session, previous_stream_boundary, parsed_filters
|
||||
)
|
||||
.order_by(text("timestamp desc"))
|
||||
.limit(1)
|
||||
).one_or_none()
|
||||
if maybe_ethereum_transaction is None:
|
||||
return None
|
||||
return self.ethereum_transaction_event(maybe_ethereum_transaction)
|
||||
|
||||
|
||||
EthereumTransactions = TransactionsProvider(
|
||||
event_type="ethereum_blockchain",
|
||||
blockchain=AvailableBlockchainType("ethereum"),
|
||||
description="Provider for resiving transactions from Ethereum tables.",
|
||||
streamboaundary_range_limit=2 * 60 * 60,
|
||||
)
|
||||
|
||||
PolygonTransactions = TransactionsProvider(
|
||||
event_type="polygon_blockchain",
|
||||
blockchain=AvailableBlockchainType("polygon"),
|
||||
description="Provider for resiving transactions from Polygon tables.",
|
||||
streamboaundary_range_limit=2 * 60 * 60,
|
||||
)
|
|
@ -20,7 +20,6 @@ from ..settings import (
|
|||
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
|
||||
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
|
||||
)
|
||||
import pprint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -36,6 +35,8 @@ BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
|
|||
blockchain_by_subscription_id = {
|
||||
"ethereum_blockchain": "ethereum",
|
||||
"polygon_blockchain": "polygon",
|
||||
"ethereum_smartcontract": "ethereum",
|
||||
"polygon_smartcontract": "polygon",
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ from moonstreamdb import db
|
|||
from sqlalchemy.orm import Session
|
||||
|
||||
from .. import data
|
||||
from ..providers.bugout import whalewatch_provider
|
||||
from ..providers.bugout import ethereum_whalewatch_provider
|
||||
from ..settings import (
|
||||
bugout_client,
|
||||
MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
|
@ -47,14 +47,14 @@ async def stream_handler(
|
|||
include_end=include_end,
|
||||
)
|
||||
|
||||
result = whalewatch_provider.get_events(
|
||||
result = ethereum_whalewatch_provider.get_events(
|
||||
db_session=db_session,
|
||||
bugout_client=bugout_client,
|
||||
data_journal_id=MOONSTREAM_DATA_JOURNAL_ID,
|
||||
data_access_token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
stream_boundary=stream_boundary,
|
||||
user_subscriptions={whalewatch_provider.event_type: []},
|
||||
query=StreamQuery(subscription_types=[whalewatch_provider.event_type]),
|
||||
user_subscriptions={ethereum_whalewatch_provider.event_type: []},
|
||||
query=StreamQuery(subscription_types=[ethereum_whalewatch_provider.event_type]),
|
||||
)
|
||||
|
||||
if result is None:
|
||||
|
|
|
@ -13,9 +13,9 @@ setup(
|
|||
install_requires=[
|
||||
"appdirs",
|
||||
"boto3",
|
||||
"bugout",
|
||||
"bugout>=0.1.19",
|
||||
"fastapi",
|
||||
"moonstreamdb",
|
||||
"moonstreamdb>=0.2.2",
|
||||
"humbug",
|
||||
"pydantic",
|
||||
"pyevmasm",
|
||||
|
|
|
@ -12,7 +12,7 @@ from typing import Any, Callable, Dict, List, Union
|
|||
from uuid import UUID
|
||||
|
||||
import boto3 # type: ignore
|
||||
from bugout.data import BugoutResources
|
||||
from bugout.data import BugoutResource, BugoutResources
|
||||
from moonstreamdb.db import yield_db_session_ctx
|
||||
from sqlalchemy import Column, and_, func, text, distinct
|
||||
from sqlalchemy.orm import Query, Session
|
||||
|
@ -39,14 +39,16 @@ logger = logging.getLogger(__name__)
|
|||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
subscription_id_by_blockchain = {
|
||||
"ethereum": "ethereum_blockchain",
|
||||
"polygon": "polygon_blockchain",
|
||||
subscription_ids_by_blockchain = {
|
||||
"ethereum": ["ethereum_blockchain", "ethereum_smartcontract"],
|
||||
"polygon": ["polygon_blockchain", "polygon_smartcontract"],
|
||||
}
|
||||
|
||||
blockchain_by_subscription_id = {
|
||||
"ethereum_blockchain": "ethereum",
|
||||
"polygon_blockchain": "polygon",
|
||||
"ethereum_smartcontract": "ethereum",
|
||||
"polygon_smartcontract": "polygon",
|
||||
}
|
||||
|
||||
|
||||
|
@ -586,25 +588,30 @@ def stats_generate_handler(args: argparse.Namespace):
|
|||
|
||||
print(f"Amount of dashboards: {len(dashboard_resources.resources)}")
|
||||
|
||||
# Create subscriptions dict for get subscriptions by id.
|
||||
blockchain_subscriptions: BugoutResources = bc.list_resources(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
params={
|
||||
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
|
||||
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
# get all subscriptions
|
||||
|
||||
print(
|
||||
f"Amount of blockchain subscriptions: {len(blockchain_subscriptions.resources)}"
|
||||
)
|
||||
available_subscriptions: List[BugoutResource] = []
|
||||
|
||||
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
|
||||
|
||||
# Create subscriptions dict for get subscriptions by id.
|
||||
blockchain_subscriptions: BugoutResources = bc.list_resources(
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
params={
|
||||
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
|
||||
"subscription_type_id": subscription_type,
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
available_subscriptions.extend(blockchain_subscriptions.resources)
|
||||
|
||||
subscription_by_id = {
|
||||
str(blockchain_subscription.id): blockchain_subscription
|
||||
for blockchain_subscription in blockchain_subscriptions.resources
|
||||
for blockchain_subscription in available_subscriptions
|
||||
}
|
||||
|
||||
print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
|
||||
|
||||
s3_client = boto3.client("s3")
|
||||
|
||||
subscriptions_count = 0
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
from .db import yield_db_session, yield_db_session_ctx
|
||||
from .models import (
|
||||
EthereumBlock,
|
||||
EthereumLabel,
|
||||
EthereumTransaction,
|
||||
PolygonBlock,
|
||||
PolygonLabel,
|
||||
PolygonTransaction,
|
||||
)
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from typing import Type, Union
|
||||
|
||||
|
||||
class AvailableBlockchainType(Enum):
|
||||
ETHEREUM = "ethereum"
|
||||
POLYGON = "polygon"
|
||||
|
||||
|
||||
def get_block_model(
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
) -> Type[Union[EthereumBlock, PolygonBlock]]:
|
||||
"""
|
||||
Depends on provided blockchain type: Ethereum or Polygon,
|
||||
set proper blocks model: EthereumBlock or PolygonBlock.
|
||||
"""
|
||||
block_model: Type[Union[EthereumBlock, PolygonBlock]]
|
||||
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||
block_model = EthereumBlock
|
||||
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||
block_model = PolygonBlock
|
||||
else:
|
||||
raise Exception("Unsupported blockchain type provided")
|
||||
|
||||
return block_model
|
||||
|
||||
|
||||
def get_label_model(
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
) -> Type[Union[EthereumLabel, PolygonLabel]]:
|
||||
"""
|
||||
Depends on provided blockchain type: Ethereum or Polygon,
|
||||
set proper block label model: EthereumLabel or PolygonLabel.
|
||||
"""
|
||||
label_model: Type[Union[EthereumLabel, PolygonLabel]]
|
||||
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||
label_model = EthereumLabel
|
||||
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||
label_model = PolygonLabel
|
||||
else:
|
||||
raise Exception("Unsupported blockchain type provided")
|
||||
|
||||
return label_model
|
||||
|
||||
|
||||
def get_transaction_model(
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
) -> Type[Union[EthereumTransaction, PolygonTransaction]]:
|
||||
"""
|
||||
Depends on provided blockchain type: Ethereum or Polygon,
|
||||
set proper block transactions model: EthereumTransaction or PolygonTransaction.
|
||||
"""
|
||||
transaction_model: Type[Union[EthereumTransaction, PolygonTransaction]]
|
||||
if blockchain_type == AvailableBlockchainType.ETHEREUM:
|
||||
transaction_model = EthereumTransaction
|
||||
elif blockchain_type == AvailableBlockchainType.POLYGON:
|
||||
transaction_model = PolygonTransaction
|
||||
else:
|
||||
raise Exception("Unsupported blockchain type provided")
|
||||
|
||||
return transaction_model
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream database version.
|
||||
"""
|
||||
|
||||
MOONSTREAMDB_VERSION = "0.2.1"
|
||||
MOONSTREAMDB_VERSION = "0.2.2"
|
||||
|
|
|
@ -14,6 +14,7 @@ import {
|
|||
} from "@chakra-ui/react";
|
||||
import { DEFAULT_METATAGS, AWS_ASSETS_PATH } from "../../src/core/constants";
|
||||
import UIContext from "../../src/core/providers/UIProvider/context";
|
||||
import TeamCard from "../../src/components/TeamCard";
|
||||
|
||||
const assets = {
|
||||
background720: `${AWS_ASSETS_PATH}/blog-background-720x405.png`,
|
||||
|
@ -21,6 +22,14 @@ const assets = {
|
|||
background2880: `${AWS_ASSETS_PATH}/blog-background-720x405.png`,
|
||||
background3840: `${AWS_ASSETS_PATH}/blog-background-720x405.png`,
|
||||
team: `${AWS_ASSETS_PATH}/Team-page-illustration.png`,
|
||||
dragonfly: `${AWS_ASSETS_PATH}/dragonfly.jpg`,
|
||||
ladybird: `${AWS_ASSETS_PATH}/ladybird.jpg`,
|
||||
locust: `${AWS_ASSETS_PATH}/locust.jpg`,
|
||||
mantis: `${AWS_ASSETS_PATH}/mantis.jpg`,
|
||||
centipede: `${AWS_ASSETS_PATH}/centipede.jpg`,
|
||||
spider: `${AWS_ASSETS_PATH}/spider.jpg`,
|
||||
ant: `${AWS_ASSETS_PATH}/ant.jpg`,
|
||||
firefly: `${AWS_ASSETS_PATH}/firefly.jpg`,
|
||||
};
|
||||
|
||||
const Product = () => {
|
||||
|
@ -204,62 +213,86 @@ const Product = () => {
|
|||
<Heading as="h2" size="md" w="100%" px={12} py={2} borderTopRadius="xl">
|
||||
Our engineering team
|
||||
</Heading>
|
||||
<chakra.span pl={2} px={12} py={2}>
|
||||
<UnorderedList w="75%" pl={4} spacing={2}>
|
||||
<ListItem>
|
||||
<b>zomglings{". "}</b> Founder. Number theorist. Loves playing
|
||||
chess while programming. Fan of GO, backgammon, and video games.
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>kompotkot{". "}</b>Keeper of Secrets. Likes information
|
||||
security since childhood, loves mountains and goes hiking from
|
||||
time to time. Had a close call with a wild bear in a forest once.
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>wizarikus{". "}</b>Wizard. Loves mountains, bicycling, and
|
||||
hiking. A practicing Python wizard. Also likes to cook and play
|
||||
the guitar in between data witchcraft.
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>peersky{". "}</b>
|
||||
{`Spectral hopper. Perceives the world as a
|
||||
spectrum interacting with and within the observer's mind. Loves
|
||||
to shift in time domain to spend some of it doing fire
|
||||
performances, surfing, and connecting with nature.`}
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>yhtyyar{". "}</b>
|
||||
{`Wunderkind. Interested in Math, NLP. Loves
|
||||
programming language parsing and Algorithms & Data structures.
|
||||
Implementing his own dialect of LISP programming language for
|
||||
scientific calculations.`}
|
||||
</ListItem>
|
||||
</UnorderedList>
|
||||
</chakra.span>
|
||||
<Stack
|
||||
w="100%"
|
||||
direction={"row"}
|
||||
flexWrap="wrap"
|
||||
spacing={4}
|
||||
justifyContent="space-between"
|
||||
px={12}
|
||||
placeContent={"center"}
|
||||
>
|
||||
<TeamCard
|
||||
avatarURL={assets["ant"]}
|
||||
name={"Neeraj Kashyap"}
|
||||
atName={"@zomglings"}
|
||||
content={` Founder. Number theorist. Loves playing chess while programming. Fan of GO, backgammon, and video games.`}
|
||||
/>
|
||||
<TeamCard
|
||||
avatarURL={assets["spider"]}
|
||||
name={"Sergei Sumarokov"}
|
||||
atName={"@kompotkot"}
|
||||
content={`Keeper of Secrets. Likes information
|
||||
security since childhood, loves mountains and goes hiking from
|
||||
time to time. Had a close call with a wild bear in a forest once.`}
|
||||
/>
|
||||
<TeamCard
|
||||
avatarURL={assets["locust"]}
|
||||
name={"Andrey Dolgolev"}
|
||||
atName={"@Andrei-Dolgolev"}
|
||||
content={`Wizard. Loves mountains, bicycling, and
|
||||
hiking. A practicing Python wizard. Also likes to cook and play
|
||||
the guitar in between data witchcraft.`}
|
||||
/>
|
||||
<TeamCard
|
||||
avatarURL={assets["dragonfly"]}
|
||||
name={"Tim Pechersky"}
|
||||
atName={"@peersky"}
|
||||
content={`Spectral hopper. Hes special ability to precieve world trough spectral domain. Occasionaly
|
||||
shifts in to time domain to spend some in doing flow
|
||||
arts, surfing, and connecting with nature.`}
|
||||
/>
|
||||
<TeamCard
|
||||
avatarURL={assets["centipede"]}
|
||||
name={"yhtyyar"}
|
||||
atName={"@Yhtiyar"}
|
||||
content={`Wunderkind. Interested in Math, NLP. Loves
|
||||
programming language parsing and Algorithms & Data structures.
|
||||
Implementing his own dialect of LISP programming language for
|
||||
scientific calculations.`}
|
||||
/>
|
||||
</Stack>
|
||||
</Stack>
|
||||
<Stack mx={margin} mb={12} maxW="1700px" w="100%">
|
||||
<Heading as="h2" size="md" w="100%" px={12} py={2} borderTopRadius="xl">
|
||||
Our marketing and growth team
|
||||
</Heading>
|
||||
<chakra.span pl={2} px={12} py={2}>
|
||||
<UnorderedList w="75%" pl={4} spacing={2}>
|
||||
<ListItem>
|
||||
<b>pahita{". "}</b> Dreamer. An alien who pretends to be a human.
|
||||
So far so good. Loves ecstatic dance, being alone in nature and
|
||||
dreaming.
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>in_technicolor{". "}</b>Mediator. Loves stand-up comedy and
|
||||
crying at nights. Volunteered at a horse farm once. Portrait
|
||||
artist, puts the pain in painting.
|
||||
</ListItem>
|
||||
<ListItem>
|
||||
<b>nanaland{". "}</b>Progress and Enthusiasm. Traveled to the
|
||||
North Korean border at the age of 19. Half German. Counseling
|
||||
psychologist who switched to tech marketing and sales.
|
||||
</ListItem>
|
||||
</UnorderedList>
|
||||
</chakra.span>
|
||||
<Stack
|
||||
w="100%"
|
||||
direction={"row"}
|
||||
flexWrap="wrap"
|
||||
spacing={4}
|
||||
justifyContent="space-between"
|
||||
px={12}
|
||||
placeContent={"center"}
|
||||
>
|
||||
<TeamCard
|
||||
avatarURL={assets["ladybird"]}
|
||||
name={"Sophia Aryan"}
|
||||
atName={"@pahita"}
|
||||
content={`Dreamer. An alien who pretends to be a human.
|
||||
So far so good. Loves ecstatic dance, being alone in nature and
|
||||
dreaming.`}
|
||||
/>
|
||||
<TeamCard
|
||||
avatarURL={assets["mantis"]}
|
||||
name={"Daria Navoloshnikova"}
|
||||
atName={"@in_technicolor"}
|
||||
content={`Mediator. Loves stand-up comedy and
|
||||
crying at nights. Volunteered at a horse farm once. Portrait
|
||||
artist, puts the pain in painting.`}
|
||||
/>
|
||||
</Stack>
|
||||
</Stack>
|
||||
</Flex>
|
||||
);
|
||||
|
|
|
@ -105,7 +105,9 @@ const NewDashboard = (props) => {
|
|||
|
||||
const filterFn = (item, inputValue) =>
|
||||
(item.subscription_type_id === "ethereum_blockchain" ||
|
||||
item.subscription_type_id === "polygon_blockchain") &&
|
||||
item.subscription_type_id === "polygon_blockchain" ||
|
||||
item.subscription_type_id === "polygon_smartcontract" ||
|
||||
item.subscription_type_id === "ethereum_smartcontract") &&
|
||||
(!inputValue ||
|
||||
item.address.toUpperCase().includes(inputValue.toUpperCase()) ||
|
||||
item.label.toUpperCase().includes(inputValue.toUpperCase()));
|
||||
|
|
|
@ -2,13 +2,21 @@ import React, { useContext } from "react";
|
|||
import { Flex, IconButton, Stack, Tooltip, chakra } from "@chakra-ui/react";
|
||||
import { ArrowRightIcon } from "@chakra-ui/icons";
|
||||
import UIContext from "../core/providers/UIProvider/context";
|
||||
import EthereumBlockchainCard from "./stream-cards/EthereumBlockchain";
|
||||
import EthereumTXPoolCard from "./stream-cards/EthereumTXPool";
|
||||
import EthereumWhalewatchCard from "./stream-cards/EthereumWhalewatch";
|
||||
import BlockchainCard from "./stream-cards/Blockchain";
|
||||
import TXPoolCard from "./stream-cards/TXPool";
|
||||
import WhalewatchCard from "./stream-cards/Whalewatch";
|
||||
import SmartcontractCard from "./stream-cards/Smartcontract";
|
||||
|
||||
const StreamEntry_ = ({ entry, showOnboardingTooltips, className }) => {
|
||||
const ui = useContext(UIContext);
|
||||
|
||||
const eventCategories = {
|
||||
blockchain: "_blockchain",
|
||||
whalewatch: "_whalewatch",
|
||||
txpool: "_txpool",
|
||||
smartcontract: "_smartcontract",
|
||||
};
|
||||
|
||||
return (
|
||||
<Flex
|
||||
className={className}
|
||||
|
@ -36,22 +44,29 @@ const StreamEntry_ = ({ entry, showOnboardingTooltips, className }) => {
|
|||
h="100%"
|
||||
spacing={0}
|
||||
>
|
||||
{entry.event_type === "ethereum_blockchain" && (
|
||||
<EthereumBlockchainCard
|
||||
{entry.event_type.includes(eventCategories.blockchain) && (
|
||||
<BlockchainCard
|
||||
entry={entry}
|
||||
showOnboardingTooltips={showOnboardingTooltips}
|
||||
/>
|
||||
)}
|
||||
|
||||
{entry.event_type === "ethereum_whalewatch" && (
|
||||
<EthereumWhalewatchCard
|
||||
{entry.event_type.includes(eventCategories.whalewatch) && (
|
||||
<WhalewatchCard
|
||||
entry={entry}
|
||||
showOnboardingTooltips={showOnboardingTooltips}
|
||||
/>
|
||||
)}
|
||||
|
||||
{entry.event_type === "ethereum_txpool" && (
|
||||
<EthereumTXPoolCard
|
||||
{entry.event_type.includes(eventCategories.txpool) && (
|
||||
<TXPoolCard
|
||||
entry={entry}
|
||||
showOnboardingTooltips={showOnboardingTooltips}
|
||||
/>
|
||||
)}
|
||||
|
||||
{entry.event_type.includes(eventCategories.smartcontract) && (
|
||||
<SmartcontractCard
|
||||
entry={entry}
|
||||
showOnboardingTooltips={showOnboardingTooltips}
|
||||
/>
|
||||
|
|
|
@ -35,6 +35,7 @@ const SubscriptionsList = ({ emptyCTA }) => {
|
|||
updateSubscription,
|
||||
deleteSubscription,
|
||||
subscriptionTypeIcons,
|
||||
subscriptionTypeNames,
|
||||
} = useSubscriptions();
|
||||
|
||||
const updateCallback = ({ id, label, color }) => {
|
||||
|
@ -79,7 +80,12 @@ const SubscriptionsList = ({ emptyCTA }) => {
|
|||
return (
|
||||
<Tr key={`token-row-${subscription.id}`}>
|
||||
<Td>
|
||||
<Tooltip label="Ethereum blockchain" fontSize="md">
|
||||
<Tooltip
|
||||
label={`${
|
||||
subscriptionTypeNames[subscription.subscription_type_id]
|
||||
}`}
|
||||
fontSize="md"
|
||||
>
|
||||
<Image h="32px" src={iconLink} alt="pool icon" />
|
||||
</Tooltip>
|
||||
</Td>
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
import React from "react";
|
||||
import {
|
||||
Heading,
|
||||
Avatar,
|
||||
Box,
|
||||
Center,
|
||||
Text,
|
||||
Stack,
|
||||
Badge,
|
||||
useColorModeValue,
|
||||
} from "@chakra-ui/react";
|
||||
|
||||
export default function SocialProfileSimple({
|
||||
avatarURL,
|
||||
avatarAlt,
|
||||
name,
|
||||
atName,
|
||||
content,
|
||||
badges,
|
||||
isOnline,
|
||||
buttons,
|
||||
}) {
|
||||
const badgeBg = useColorModeValue("gray.50", "gray.800");
|
||||
return (
|
||||
<Center py={6}>
|
||||
<Box
|
||||
maxW={"320px"}
|
||||
h="420px"
|
||||
w={"full"}
|
||||
bg={useColorModeValue("white.50", "gray.900")}
|
||||
boxShadow={"2xl"}
|
||||
rounded={"lg"}
|
||||
p={6}
|
||||
textAlign={"center"}
|
||||
>
|
||||
<Avatar
|
||||
size={"xl"}
|
||||
src={avatarURL}
|
||||
alt={avatarAlt}
|
||||
mb={4}
|
||||
pos={"relative"}
|
||||
_after={
|
||||
isOnline && {
|
||||
content: '""',
|
||||
w: 4,
|
||||
h: 4,
|
||||
bg: "green.300",
|
||||
border: "2px solid white",
|
||||
rounded: "full",
|
||||
pos: "absolute",
|
||||
bottom: 0,
|
||||
right: 3,
|
||||
}
|
||||
}
|
||||
/>
|
||||
<Heading fontSize={"2xl"} fontFamily={"body"}>
|
||||
{name}
|
||||
</Heading>
|
||||
<Text fontWeight={600} color={"gray.900"} mb={4}>
|
||||
{atName}
|
||||
</Text>
|
||||
<Text
|
||||
textAlign={"center"}
|
||||
color={useColorModeValue("blue.500", "gray.100")}
|
||||
px={3}
|
||||
>
|
||||
{content}
|
||||
</Text>
|
||||
|
||||
<Stack align={"center"} justify={"center"} direction={"row"} mt={6}>
|
||||
{badges &&
|
||||
badges.map((badgeContent, idx) => (
|
||||
<Badge
|
||||
key={`badge-card-${name}-${idx}`}
|
||||
px={2}
|
||||
py={1}
|
||||
bg={badgeBg}
|
||||
fontWeight={"400"}
|
||||
>
|
||||
{badgeContent}
|
||||
</Badge>
|
||||
))}
|
||||
</Stack>
|
||||
|
||||
<Stack mt={8} direction={"row"} spacing={4}>
|
||||
{buttons}
|
||||
</Stack>
|
||||
</Box>
|
||||
</Center>
|
||||
);
|
||||
}
|
|
@ -0,0 +1,277 @@
|
|||
import React, { useContext, useEffect, useState } from "react";
|
||||
import {
|
||||
Flex,
|
||||
Text,
|
||||
Textarea,
|
||||
Stack,
|
||||
Tooltip,
|
||||
useClipboard,
|
||||
Heading,
|
||||
Image,
|
||||
useMediaQuery,
|
||||
Spacer,
|
||||
Spinner,
|
||||
chakra,
|
||||
} from "@chakra-ui/react";
|
||||
import moment from "moment";
|
||||
import UIContext from "../../core/providers/UIProvider/context";
|
||||
import { useToast } from "../../core/hooks";
|
||||
import { useSubscriptions } from "../../core/hooks";
|
||||
|
||||
const SmartcontractCard_ = ({ entry, showOnboardingTooltips, className }) => {
|
||||
const { subscriptionsCache, subscriptionTypeIcons } = useSubscriptions();
|
||||
const ui = useContext(UIContext);
|
||||
const [copyString, setCopyString] = useState(false);
|
||||
const [icon, setIcon] = useState(null);
|
||||
const { onCopy, hasCopied } = useClipboard(copyString, 1);
|
||||
const toast = useToast();
|
||||
|
||||
useEffect(() => {
|
||||
if (hasCopied && copyString) {
|
||||
toast("Copied to clipboard", "success");
|
||||
setCopyString(false);
|
||||
} else if (copyString) {
|
||||
onCopy();
|
||||
}
|
||||
}, [copyString, onCopy, hasCopied, toast]);
|
||||
|
||||
useEffect(() => {
|
||||
if (subscriptionTypeIcons) {
|
||||
setIcon(subscriptionTypeIcons.ethereum_);
|
||||
}
|
||||
}, [subscriptionTypeIcons, setIcon]);
|
||||
|
||||
const [showFullView] = useMediaQuery(["(min-width: 420px)"]);
|
||||
if (subscriptionsCache.isLoading) return <Spinner />;
|
||||
|
||||
const transaction = {
|
||||
...entry.event_data,
|
||||
};
|
||||
|
||||
const color =
|
||||
subscriptionsCache.data.subscriptions.find((obj) => {
|
||||
return obj.address === transaction.address;
|
||||
})?.color ?? "gray.500";
|
||||
|
||||
const label =
|
||||
subscriptionsCache.data.subscriptions.find((obj) => {
|
||||
return obj.address === transaction.address;
|
||||
})?.label ?? transaction.address;
|
||||
|
||||
return (
|
||||
<Stack className={className}>
|
||||
<Tooltip
|
||||
hasArrow
|
||||
isOpen={showOnboardingTooltips}
|
||||
label="Top of card describes type of event. Ethereum blockchain in this case. It as unique hash shown here"
|
||||
variant="onboarding"
|
||||
placement="top"
|
||||
>
|
||||
<Stack
|
||||
className="title"
|
||||
direction="row"
|
||||
w="100%"
|
||||
h="1.6rem"
|
||||
minH="1.6rem"
|
||||
textAlign="center"
|
||||
spacing={0}
|
||||
alignItems="center"
|
||||
bgColor="gray.300"
|
||||
>
|
||||
{icon ? (
|
||||
<Image
|
||||
boxSize="16px"
|
||||
src={
|
||||
"https://upload.wikimedia.org/wikipedia/commons/0/05/Ethereum_logo_2014.svg"
|
||||
}
|
||||
/>
|
||||
) : (
|
||||
""
|
||||
)}
|
||||
<Heading px={1} size="xs">
|
||||
Hash
|
||||
</Heading>
|
||||
<Spacer />
|
||||
<Text
|
||||
isTruncated
|
||||
onClick={() => setCopyString(transaction.hash)}
|
||||
pr={12}
|
||||
>
|
||||
{transaction.hash}
|
||||
</Text>
|
||||
</Stack>
|
||||
</Tooltip>
|
||||
<Stack
|
||||
className="CardAddressesRow"
|
||||
direction={showFullView ? "row" : "column"}
|
||||
w="100%"
|
||||
h={showFullView ? "1.6rem" : "3.2rem"}
|
||||
minH="1.6rem"
|
||||
textAlign="center"
|
||||
spacing={0}
|
||||
alignItems="center"
|
||||
>
|
||||
<Text
|
||||
bgColor="gray.600"
|
||||
h="100%"
|
||||
py={1}
|
||||
px={2}
|
||||
w={showFullView ? null : "120px"}
|
||||
>
|
||||
address:
|
||||
</Text>
|
||||
<Tooltip label={transaction.address} aria-label="address:">
|
||||
<Text
|
||||
bgColor={color}
|
||||
isTruncated
|
||||
w="calc(100%)"
|
||||
h="100%"
|
||||
onClick={() => setCopyString(transaction.address)}
|
||||
>
|
||||
{label}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
</Stack>
|
||||
<Flex flexWrap="wrap" w="100%">
|
||||
<Flex minH="2rem" minW="fit-content" flexGrow={1}>
|
||||
<Text
|
||||
h="100%"
|
||||
fontSize="sm"
|
||||
py="2px"
|
||||
px={2}
|
||||
whiteSpace="nowrap"
|
||||
w={showFullView ? null : "120px"}
|
||||
textAlign="justify"
|
||||
>
|
||||
log index:
|
||||
</Text>
|
||||
<Tooltip label={transaction.log_index} aria-label="Log index:">
|
||||
<Text
|
||||
mx={0}
|
||||
py="2px"
|
||||
fontSize="sm"
|
||||
w="calc(100%)"
|
||||
h="100%"
|
||||
onClick={() => setCopyString(transaction.log_index)}
|
||||
>
|
||||
{transaction.log_index}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
</Flex>
|
||||
<Flex h="2rem" minW="fit-content" flexGrow={1}>
|
||||
<Text
|
||||
w={showFullView ? null : "120px"}
|
||||
h="100%"
|
||||
fontSize="sm"
|
||||
py="2px"
|
||||
px={2}
|
||||
textAlign="justify"
|
||||
>
|
||||
type:
|
||||
</Text>
|
||||
<Tooltip label={transaction?.label_data?.type} aria-label="type:">
|
||||
<Text
|
||||
mx={0}
|
||||
py="2px"
|
||||
fontSize="sm"
|
||||
w="calc(100%)"
|
||||
h="100%"
|
||||
onClick={() => setCopyString(transaction?.label_data?.type)}
|
||||
>
|
||||
{transaction?.label_data?.type}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
</Flex>
|
||||
<Flex h="2rem" minW="fit-content" flexGrow={1}>
|
||||
<Text
|
||||
w={showFullView ? null : "120px"}
|
||||
h="100%"
|
||||
fontSize="sm"
|
||||
py="2px"
|
||||
px={2}
|
||||
textAlign="justify"
|
||||
>
|
||||
name:
|
||||
</Text>
|
||||
<Tooltip label={transaction?.label_data?.name} aria-label="Name:">
|
||||
<Text
|
||||
mx={0}
|
||||
py="2px"
|
||||
fontSize="sm"
|
||||
w="calc(100%)"
|
||||
h="100%"
|
||||
onClick={() => setCopyString(transaction?.label_data?.name)}
|
||||
>
|
||||
{transaction?.label_data?.name}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
</Flex>
|
||||
|
||||
<Flex h="2rem" minW="fit-content" flexGrow={1}>
|
||||
<Text
|
||||
minW="fit-content"
|
||||
h="100%"
|
||||
fontSize="sm"
|
||||
py="2px"
|
||||
px={2}
|
||||
textAlign="justify"
|
||||
>
|
||||
data:
|
||||
</Text>
|
||||
</Flex>
|
||||
{entry.event_timestamp && (
|
||||
<Flex h="auto" minW="fit-content">
|
||||
<Text
|
||||
px={1}
|
||||
mx={0}
|
||||
py="2px"
|
||||
fontSize="sm"
|
||||
w="calc(100%)"
|
||||
h="100%"
|
||||
borderColor="gray.700"
|
||||
>
|
||||
{moment(entry.event_timestamp * 1000).format(
|
||||
"DD MMM, YYYY, HH:mm:ss"
|
||||
)}{" "}
|
||||
</Text>
|
||||
</Flex>
|
||||
)}
|
||||
</Flex>
|
||||
<Flex>
|
||||
{" "}
|
||||
<Tooltip label={transaction.value} aria-label="Value:">
|
||||
<Textarea
|
||||
mx={0}
|
||||
minH="max-content"
|
||||
py="4px"
|
||||
fontSize="sm"
|
||||
w="calc(100%)"
|
||||
value={JSON.stringify(transaction["label_data"], null, 4)}
|
||||
h="100%"
|
||||
onClick={() =>
|
||||
setCopyString(JSON.stringify(transaction["label_data"], null, 4))
|
||||
}
|
||||
></Textarea>
|
||||
</Tooltip>
|
||||
</Flex>
|
||||
</Stack>
|
||||
);
|
||||
};
|
||||
|
||||
const SmartcontractCard = chakra(SmartcontractCard_, {
|
||||
baseStyle: {
|
||||
my: 0,
|
||||
direction: "column",
|
||||
flexBasis: "10px",
|
||||
flexGrow: 1,
|
||||
borderWidth: "2px",
|
||||
borderLeftRadius: "md",
|
||||
borderColor: "gray.600",
|
||||
spacing: 0,
|
||||
h: "auto",
|
||||
overflowX: "hidden",
|
||||
overflowY: "visible",
|
||||
},
|
||||
});
|
||||
|
||||
export default SmartcontractCard;
|
|
@ -22,6 +22,10 @@ const useStream = (q, streamCache, setStreamCache, cursor, setCursor) => {
|
|||
setStreamBoundary(defaultBoundary);
|
||||
};
|
||||
|
||||
function isEmpty(obj) {
|
||||
return Object.keys(obj).length === 0;
|
||||
}
|
||||
|
||||
const updateStreamBoundaryWith = (
|
||||
extensionBoundary,
|
||||
{ ignoreStart, ignoreEnd }
|
||||
|
@ -107,7 +111,6 @@ const useStream = (q, streamCache, setStreamCache, cursor, setCursor) => {
|
|||
setEvents(newEvents.events.slice(0, newEvents.events.length));
|
||||
}
|
||||
}
|
||||
|
||||
updateStreamBoundaryWith(newEvents.stream_boundary, {});
|
||||
}
|
||||
},
|
||||
|
@ -122,7 +125,7 @@ const useStream = (q, streamCache, setStreamCache, cursor, setCursor) => {
|
|||
useQuery(
|
||||
"stream-older-events",
|
||||
() => {
|
||||
if (olderEvent) {
|
||||
if (olderEvent && !isEmpty(olderEvent)) {
|
||||
const newStreamBoundary = {
|
||||
// 5 minutes before the previous event
|
||||
start_time: olderEvent.event_timestamp - 5 * 60,
|
||||
|
@ -165,7 +168,7 @@ const useStream = (q, streamCache, setStreamCache, cursor, setCursor) => {
|
|||
useQuery(
|
||||
"stream-newest-events",
|
||||
() => {
|
||||
if (newerEvent) {
|
||||
if (newerEvent && !isEmpty(newerEvent)) {
|
||||
const newStreamBoundary = {
|
||||
// TODO(zomglings): This is a workaround to what seems to be a filter bug on `created_at:>=...` filters
|
||||
// on Bugout journals. Please look into it.
|
||||
|
|
|
@ -11,6 +11,7 @@ const useSubscriptions = () => {
|
|||
const stripe = useStripe();
|
||||
|
||||
const [subscriptionTypeIcons, setSubscriptionTypeIcons] = useState({});
|
||||
const [subscriptionTypeNames, setSubscriptionTypeNames] = useState({});
|
||||
|
||||
const getSubscriptions = async () => {
|
||||
const response = await SubscriptionsService.getSubscriptions();
|
||||
|
@ -42,11 +43,13 @@ const useSubscriptions = () => {
|
|||
|
||||
useEffect(() => {
|
||||
let icons = {};
|
||||
let display_names = {};
|
||||
if (typesCache.data) {
|
||||
typesCache.data.forEach(
|
||||
(subscriptionType) =>
|
||||
(icons[subscriptionType.id] = subscriptionType.icon_url)
|
||||
);
|
||||
typesCache.data.forEach((subscriptionType) => {
|
||||
icons[subscriptionType.id] = subscriptionType.icon_url;
|
||||
display_names[subscriptionType.id] = subscriptionType.name;
|
||||
});
|
||||
setSubscriptionTypeNames(display_names);
|
||||
setSubscriptionTypeIcons(icons);
|
||||
}
|
||||
}, [typesCache.data]);
|
||||
|
@ -95,6 +98,7 @@ const useSubscriptions = () => {
|
|||
updateSubscription,
|
||||
deleteSubscription,
|
||||
subscriptionTypeIcons,
|
||||
subscriptionTypeNames,
|
||||
};
|
||||
};
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue