diff --git a/backend/moonstream/actions.py b/backend/moonstream/actions.py new file mode 100644 index 00000000..8e29aa95 --- /dev/null +++ b/backend/moonstream/actions.py @@ -0,0 +1,205 @@ +from datetime import datetime +import logging + + +from typing import Dict, Any, List, Optional, Union +from moonstreamdb.models import ( + EthereumBlock, + EthereumTransaction, + EthereumPendingTransaction, +) +from sqlalchemy import or_, and_, text +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import desc, false + +from . import data + +from .settings import DEFAULT_PAGE_SIZE + + +logger = logging.getLogger(__name__) + + +async def get_transaction_in_blocks( + db_session: Session, + query: str, + user_subscriptions_resources_by_address: Dict[str, Any], + start_time: Optional[int] = 0, + end_time: Optional[int] = 0, +) -> List[data.EthereumTransactionItem]: + + subscriptions_addresses = list(user_subscriptions_resources_by_address.keys()) + + if start_time < 1438215988: # first block + start_time = False + + if end_time < 1438215988: # first block + end_time = False + + if query == "" or query == " ": + + filters = [ + or_( + EthereumTransaction.to_address == address, + EthereumTransaction.from_address == address, + ) + for address in subscriptions_addresses + ] + filters = or_(*filters) + + else: + filters = database_search_query( + query, allowed_addresses=subscriptions_addresses + ) + if not filters: + return [], None, None + filters = and_(*filters) + + # Get start point + if start_time is False and end_time is False: + ethereum_transaction_start_point = ( + db_session.query( + EthereumTransaction.hash, + EthereumTransaction.block_number, + EthereumTransaction.from_address, + EthereumTransaction.to_address, + EthereumTransaction.gas, + EthereumTransaction.gas_price, + EthereumTransaction.input, + EthereumTransaction.nonce, + EthereumTransaction.value, + EthereumBlock.timestamp.label("timestamp"), + ) + .join(EthereumBlock) + .filter(filters) + .order_by(text("timestamp desc")) + .limit(1) + ).one_or_none() + start_time = False + print(ethereum_transaction_start_point) + end_time = ethereum_transaction_start_point[-1] + + ethereum_transactions = ( + db_session.query( + EthereumTransaction.hash, + EthereumTransaction.block_number, + EthereumTransaction.from_address, + EthereumTransaction.to_address, + EthereumTransaction.gas, + EthereumTransaction.gas_price, + EthereumTransaction.input, + EthereumTransaction.nonce, + EthereumTransaction.value, + EthereumBlock.timestamp.label("timestamp"), + ) + .join(EthereumBlock) + .filter(filters) + ) + + print(f"last record: {end_time}") + + if start_time and end_time: + if start_time < end_time: + start_time, end_time = end_time, start_time + + if start_time: + ethereum_transactions = ethereum_transactions.filter( + EthereumBlock.timestamp <= start_time + ) + + if end_time: + ethereum_transactions = ethereum_transactions.filter( + EthereumBlock.timestamp >= end_time + ) + + print(f"count: {ethereum_transactions.count()}") + + response = [] + for row_index, ( + hash, + block_number, + from_address, + to_address, + gas, + gas_price, + input, + nonce, + value, + timestamp, + ) in enumerate(ethereum_transactions): + + subscription_type_id = None + from_label = None + to_label = None + color = None + + if from_address in subscriptions_addresses: + from_label = user_subscriptions_resources_by_address[from_address]["label"] + subscription_type_id = user_subscriptions_resources_by_address[ + from_address + ]["subscription_type_id"] + color = user_subscriptions_resources_by_address[from_address]["color"] + + if to_address in subscriptions_addresses: + subscription_type_id = user_subscriptions_resources_by_address[to_address][ + "subscription_type_id" + ] + to_label = user_subscriptions_resources_by_address[to_address]["label"] + color = user_subscriptions_resources_by_address[to_address]["color"] + + response.append( + data.EthereumTransactionItem( + color=color, + from_label=from_label, + to_label=to_label, + block_number=block_number, + gas=gas, + gasPrice=gas_price, + value=value, + from_address=from_address, + to_address=to_address, + hash=hash, + input=input, + nonce=nonce, + timestamp=timestamp, + subscription_type_id=subscription_type_id, + ) + ) + + return response, start_time, end_time + + +def database_search_query(q: str, allowed_addresses: List[str]): + + filters = q.split("+") + constructed_filters = [] + for filter_item in filters: + if filter_item == "": + logger.warning("Skipping empty filter item") + continue + + # Try Google style search filters + components = filter_item.split(":") + if len(components) == 2: + filter_type = components[0] + filter_value = components[1] + else: + continue + + if filter_type == "to" and filter_value: + constructed_filters.append(EthereumTransaction.to_address == filter_value) + + if filter_type == "from" and filter_value: + if filter_value not in allowed_addresses: + continue + constructed_filters.append(EthereumTransaction.from_address == filter_value) + + if filter_type == "address" and filter_value: + constructed_filters.append( + or_( + EthereumTransaction.to_address == filter_value, + EthereumTransaction.from_address == filter_value, + ) + ) + + return constructed_filters diff --git a/backend/moonstream/data.py b/backend/moonstream/data.py index 9677b75e..75d03f3a 100644 --- a/backend/moonstream/data.py +++ b/backend/moonstream/data.py @@ -114,6 +114,7 @@ class EthereumTransactionItem(BaseModel): color: Optional[str] from_label: Optional[str] = None to_label: Optional[str] = None + block_number: Optional[int] = None gas: int gasPrice: int value: int @@ -128,6 +129,8 @@ class EthereumTransactionItem(BaseModel): class EthereumTransactionResponse(BaseModel): stream: List[EthereumTransactionItem] + start_time: int + end_time: int class TxinfoEthereumBlockchainRequest(BaseModel): diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index deb0b8a5..b52aaabb 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -5,28 +5,22 @@ import logging from typing import Any, cast, Dict, List, Optional, Set, Union from pydantic.utils import to_camel -from sqlalchemy.engine.base import Transaction +from datetime import datetime, timedelta -from bugout.data import BugoutResource, BugoutResources +from bugout.data import BugoutResources from bugout.exceptions import BugoutResponseException -from fastapi import Body, FastAPI, HTTPException, Request, Form, Query, Depends +from fastapi import FastAPI, HTTPException, Request, Form, Query, Depends from fastapi.middleware.cors import CORSMiddleware -from moonstreamdb.models import ( - EthereumBlock, - EthereumTransaction, - EthereumPendingTransaction, - ESDFunctionSignature, - ESDEventSignature, -) from moonstreamdb import db from sqlalchemy.orm import Session -from sqlalchemy import or_, and_ +from .. import actions from .. import data from ..middleware import BroodAuthMiddleware from ..settings import ( MOONSTREAM_APPLICATION_ID, + DEFAULT_PAGE_SIZE, DOCS_TARGET_PATH, ORIGINS, DOCS_PATHS, @@ -67,9 +61,8 @@ app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths) async def search_transactions( request: Request, q: str = Query(""), - filters: Optional[List[str]] = Query(None), - limit: int = Query(10), - offset: int = Query(0), + start_time: Optional[int] = Query(0), # Optional[int] = Query(0), # + end_time: Optional[int] = Query(0), # Optional[int] = Query(0), # db_session: Session = Depends(db.yield_db_session), ): @@ -88,136 +81,29 @@ async def search_transactions( except Exception as e: raise HTTPException(status_code=500) - subscriptions_addresses = [ - resource.resource_data["address"] - for resource in user_subscriptions_resources.resources - ] - - if q == "" or q == " ": - - filters = [ - or_( - EthereumTransaction.to_address == address, - EthereumTransaction.from_address == address, - ) - for address in subscriptions_addresses - ] - filters = or_(*filters) - - else: - filters = database_search_query(q, allowed_addresses=subscriptions_addresses) - if not filters: - return data.EthereumTransactionResponse(stream=[]) - filters = and_(*filters) - address_to_subscriptions = { resource.resource_data["address"]: resource.resource_data for resource in user_subscriptions_resources.resources } - ethereum_transactions = ( - db_session.query( - EthereumTransaction.hash, - EthereumTransaction.block_number, - EthereumTransaction.from_address, - EthereumTransaction.to_address, - EthereumTransaction.gas, - EthereumTransaction.gas_price, - EthereumTransaction.input, - EthereumTransaction.nonce, - EthereumTransaction.value, - EthereumBlock.timestamp, + transactions: List[Any] = [] + + if address_to_subscriptions: + print("address_to_subscriptions") + ( + transactions_in_blocks, + first_item_time, + last_item_time, + ) = await actions.get_transaction_in_blocks( + db_session=db_session, + query=q, + user_subscriptions_resources_by_address=address_to_subscriptions, + start_time=start_time, + end_time=end_time, ) - .join(EthereumBlock) - .filter(filters) - .limit(25) + + transactions.extend(transactions_in_blocks) + + return data.EthereumTransactionResponse( + stream=transactions, start_time=first_item_time, end_time=last_item_time ) - - response = [] - for ( - hash, - block_number, - from_address, - to_address, - gas, - gas_price, - input, - nonce, - value, - timestamp, - ) in ethereum_transactions: - - subscription_type_id = None - from_label = None - to_label = None - color = None - - if from_address in subscriptions_addresses: - from_label = address_to_subscriptions[from_address]["label"] - subscription_type_id = address_to_subscriptions[from_address][ - "subscription_type_id" - ] - color = address_to_subscriptions[from_address]["color"] - - if to_address in subscriptions_addresses: - subscription_type_id = address_to_subscriptions[to_address][ - "subscription_type_id" - ] - to_label = address_to_subscriptions[to_address]["label"] - color = address_to_subscriptions[to_address]["color"] - - response.append( - data.EthereumTransactionItem( - color=color, - from_label=from_label, - to_label=to_label, - gas=gas, - gasPrice=gas_price, - value=value, - from_address=from_address, - to_address=to_address, - hash=hash, - input=input, - nonce=nonce, - timestamp=timestamp, - subscription_type_id="1", - ) - ) - - return data.EthereumTransactionResponse(stream=response) - - -def database_search_query(q: str, allowed_addresses: List[str]): - - filters = q.split("+") - constructed_filters = [] - for filter_item in filters: - if filter_item == "": - logger.warning("Skipping empty filter item") - continue - - # Try Google style search filters - components = filter_item.split(":") - if len(components) == 2: - filter_type = components[0] - filter_value = components[1] - else: - continue - - if filter_type == "to" and filter_value: - constructed_filters.append(EthereumTransaction.to_address == filter_value) - - if filter_type == "from" and filter_value: - if filter_value not in allowed_addresses: - continue - constructed_filters.append(EthereumTransaction.from_address == filter_value) - - if filter_type == "address" and filter_value: - constructed_filters.append( - or_( - EthereumTransaction.to_address == filter_value, - EthereumTransaction.from_address == filter_value, - ) - ) - - return constructed_filters diff --git a/backend/moonstream/settings.py b/backend/moonstream/settings.py index 7ad5bdc0..ee7603c5 100644 --- a/backend/moonstream/settings.py +++ b/backend/moonstream/settings.py @@ -39,3 +39,5 @@ DOCS_PATHS = {} for path in MOONSTREAM_OPENAPI_LIST: DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}"] = "GET" DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}/openapi.json"] = "GET" + +DEFAULT_PAGE_SIZE = 10 diff --git a/frontend/src/components/StreamEntry.js b/frontend/src/components/StreamEntry.js index a17c7ccb..9ee6ffb5 100644 --- a/frontend/src/components/StreamEntry.js +++ b/frontend/src/components/StreamEntry.js @@ -42,7 +42,6 @@ const StreamEntry = ({ entry, filterCallback, filterConstants }) => { }; const [showFullView] = useMediaQuery(["(min-width: 420px)"]); - console.log(entry); return ( { - const limit = pageSize ? pageSize : 25; + //const limit = pageSize ? pageSize : 25; const getStream = (searchTerm) => - async ({ pageParam = 0 }) => { - if (!pageParam) { - pageParam = 0; - } + async ({ pageParam = { start_time: 0, end_time: 0 } }) => { + console.log("pageParam", pageParam); const response = await SubscriptionsService.getStream({ - searchTerm, - isContent, - limit, - offset: pageParam, + searchTerm: searchTerm, + start_time: pageParam.start_time, + end_time: pageParam.end_time, }); + const newEntryList = response.data.stream.map((entry) => ({ ...entry, })); + + console.log("response.data", response.data); return { data: [...newEntryList], pageParams: { - pageParam: pageParam + 1, - next_offset: response.data.next_offset, - total_results: response.data.total_results, - offset: response.data.offset, + start_time: response.data.start_time, + end_time: response.data.end_time, }, }; }; @@ -42,6 +41,9 @@ const useJournalEntries = ({ data: EntriesPages, isFetchingMore, isLoading, + fetchNextPage, + fetchPreviousPage, + hasNextPage, canFetchMore, fetchMore, refetch, @@ -49,7 +51,14 @@ const useJournalEntries = ({ refetchInterval: refreshRate, ...queryCacheProps, getNextPageParam: (lastGroup) => { - return lastGroup.next_offset === null ? false : lastGroup.next_offset; + console.log("lastGroup", lastGroup); + console.log("canFetchMore", canFetchMore); + console.log("fetchMore", fetchMore); + console.log("fetchNextPage", fetchNextPage); + console.log("fetchPreviousPage", fetchPreviousPage); + console.log("hasNextPage", hasNextPage); + + return 1; }, onSuccess: () => {}, enabled: !!enabled, diff --git a/frontend/src/core/services/subscriptions.service.js b/frontend/src/core/services/subscriptions.service.js index 14903bde..74f511cc 100644 --- a/frontend/src/core/services/subscriptions.service.js +++ b/frontend/src/core/services/subscriptions.service.js @@ -3,15 +3,14 @@ import { http } from "../utils"; const API = process.env.NEXT_PUBLIC_MOONSTREAM_API_URL; -export const getStream = ({ searchTerm, limit, offset, isContent }) => +export const getStream = ({ searchTerm, start_time, end_time }) => http({ method: "GET", url: `${API}/streams/`, params: { q: searchTerm, - limit: encodeURIComponent(limit), - offset: encodeURIComponent(offset), - content: encodeURIComponent(isContent), + start_time: encodeURIComponent(start_time), + end_time: encodeURIComponent(end_time), }, });