Temo state of replacing infinitQuery

pull/97/head
Andrey Dolgolev 2021-08-07 18:05:06 +03:00
rodzic 1a43528ae9
commit 9d69fc950a
6 zmienionych plików z 234 dodań i 68 usunięć

Wyświetl plik

@ -14,7 +14,7 @@ from sqlalchemy.sql.expression import desc, false
from . import data
from .settings import DEFAULT_PAGE_SIZE
from .settings import DEFAULT_STREAM_TIMEINTERVAL
logger = logging.getLogger(__name__)
@ -107,10 +107,64 @@ async def get_transaction_in_blocks(
EthereumBlock.timestamp <= start_time
)
print(start_time)
future_last_transaction = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
.filter(EthereumBlock.timestamp > start_time)
.order_by(text("timestamp desc"))
.limit(1)
).one_or_none()
start_time = False
if future_last_transaction:
next_future_timestamp = future_last_transaction[-1]
else:
next_future_timestamp = None
if end_time:
ethereum_transactions = ethereum_transactions.filter(
EthereumBlock.timestamp >= end_time
)
print("end_time", end_time)
next_last_transaction = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
.filter(1628263498 > EthereumBlock.timestamp)
.order_by(text("timestamp desc"))
.limit(1)
).one_or_none()
start_time = False
print("next_last_transaction_timestamp", next_last_transaction)
if next_last_transaction:
next_last_transaction_timestamp = next_last_transaction[-1]
else:
next_last_transaction_timestamp = None
print(f"count: {ethereum_transactions.count()}")
@ -166,7 +220,12 @@ async def get_transaction_in_blocks(
)
)
return response, start_time, end_time
return (
response,
end_time,
next_future_timestamp,
next_last_transaction_timestamp,
)
def database_search_query(q: str, allowed_addresses: List[str]):

Wyświetl plik

@ -131,6 +131,8 @@ class EthereumTransactionResponse(BaseModel):
stream: List[EthereumTransactionItem]
start_time: int
end_time: int
next_future_timestamp: Optional[int] = None
next_past_transaction_timestamp: Optional[int] = None
class TxinfoEthereumBlockchainRequest(BaseModel):

Wyświetl plik

@ -92,8 +92,9 @@ async def search_transactions(
print("address_to_subscriptions")
(
transactions_in_blocks,
first_item_time,
last_item_time,
end_time,
next_future_timestamp,
next_last_transaction_timestamp,
) = await actions.get_transaction_in_blocks(
db_session=db_session,
query=q,
@ -104,6 +105,11 @@ async def search_transactions(
transactions.extend(transactions_in_blocks)
return data.EthereumTransactionResponse(
stream=transactions, start_time=first_item_time, end_time=last_item_time
stream=transactions,
next_future_timestamp=next_future_timestamp,
next_past_transaction_timestamp=next_last_transaction_timestamp,
end_time=end_time,
start_time=start_time,
)

Wyświetl plik

@ -40,4 +40,4 @@ for path in MOONSTREAM_OPENAPI_LIST:
DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}"] = "GET"
DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}/openapi.json"] = "GET"
DEFAULT_PAGE_SIZE = 10
DEFAULT_STREAM_TIMEINTERVAL = 60 * 60

Wyświetl plik

@ -78,22 +78,40 @@ const EntriesNavigation = () => {
const loadMoreButtonRef = useRef(null);
const { fetchMore, isFetchingMore, canFetchMore, EntriesPages, isLoading } =
useStream({
pageSize,
refreshRate: 1500,
searchQuery: ui.searchTerm,
enabled: isStreamOn,
isContent: false,
});
// const {
// fetchPreviousPage,
// isFetchingMore,
// hasPreviousPage,
// EntriesPages,
// isLoading,
// hasNextPage,
// fetchNextPage,
// } = useStream({
// pageSize,
// refreshRate: 1500,
// searchQuery: ui.searchTerm,
// enabled: isStreamOn,
// isContent: false,
// });
const { EntriesPages, isLoading, refetch } = useStream({
refreshRate: 1500,
searchQuery: ui.searchTerm,
start_time,
end_time,
include_start,
include_end,
enabled: isStreamOn,
isContent: false,
});
const handleScroll = ({ currentTarget }) => {
if (
currentTarget.scrollTop + currentTarget.clientHeight >=
0.5 * currentTarget.scrollHeight
) {
if (!isFetchingMore && canFetchMore) {
fetchMore();
if (!isLoading && hasPreviousPage) {
fetchPreviousPage();
}
}
};
@ -439,10 +457,10 @@ const EntriesNavigation = () => {
filterConstants={{ DIRECTIONS, CONDITION, FILTER_TYPES }}
/>
))}
{canFetchMore && !isFetchingMore && (
{hasPreviousPage && !isFetchingMore && (
<Center>
<Button
onClick={() => fetchMore()}
onClick={() => fetchPreviousPage()}
variant="outline"
colorScheme="suggested"
>
@ -450,7 +468,7 @@ const EntriesNavigation = () => {
</Button>
</Center>
)}
{canFetchMore && isFetchingMore && (
{hasPreviousPage && isFetchingMore && (
<Center>
<Spinner
hidden={!isFetchingMore}

Wyświetl plik

@ -3,75 +3,156 @@ import { queryCacheProps } from "./hookCommon";
import { SubscriptionsService } from "../services";
import moment from "moment";
// const useJournalEntries = ({
// refreshRate,
// isContent,
// pageSize,
// searchQuery,
// enabled,
// }) => {
// //const limit = pageSize ? pageSize : 25;
// const getStream =
// (searchTerm) =>
// async ({
// pageParam = {
// start_time: moment().unix(),
// },
// }) => {
// console.log("pageParam", pageParam);
// console.log("moment().unix()", moment().unix());
// const response = await SubscriptionsService.getStream({
// searchTerm: searchTerm,
// start_time: pageParam.start_time,
// end_time: pageParam.start_time - 1000,
// });
// const newEntryList = response.data.stream.map((entry) => ({
// ...entry,
// }));
// console.log("response.data", response.data);
// return {
// data: [...newEntryList],
// pageParams: {
// next_future_timestamp: response.data.next_future_timestamp,
// next_past_transaction_timestamp:
// response.data.next_past_transaction_timestamp,
// start_time: response.data.start_time,
// end_time: response.data.end_time,
// },
// };
// };
// const {
// data: EntriesPages,
// isFetchingMore,
// isLoading,
// hasPreviousPage,
// fetchPreviousPage,
// hasNextPage,
// fetchNextPage,
// refetch,
// } = useInfiniteQuery(["stream", { searchQuery }], getStream(searchQuery), {
// refetchInterval: refreshRate,
// ...queryCacheProps,
// getNextPageParam: (lastGroup) => {
// return {
// start_time: moment().unix(),
// };
// },
// getPreviousPageParam: (lastGroup) => {
// return {
// start_time: lastGroup.pageParams.next_past_transaction_timestamp,
// };
// },
// onSuccess: () => {},
// enabled: !!enabled,
// });
// return {
// EntriesPages,
// hasPreviousPage,
// fetchPreviousPage,
// hasNextPage,
// fetchNextPage,
// isFetchingMore,
// refetch,
// isLoading,
// };
// };
const useJournalEntries = ({
refreshRate,
isContent,
pageSize,
searchQuery,
start_time,
end_time,
include_start,
include_end,
enabled,
}) => {
//const limit = pageSize ? pageSize : 25;
// set our get method
const getStream =
(searchTerm) =>
async ({ pageParam = { start_time: 0, end_time: 0 } }) => {
console.log("pageParam", pageParam);
(searchTerm, start_time, end_time, include_start, include_end) =>
async () => {
// Request with params to streams
const response = await SubscriptionsService.getStream({
searchTerm: searchTerm,
start_time: pageParam.start_time,
end_time: pageParam.end_time,
start_time: start_time,
end_time: end_time,
include_start: include_start,
include_end: include_end,
});
const newEntryList = response.data.stream.map((entry) => ({
...entry,
// new events from stream
const newEventsList = response.data.stream.map((event) => ({
...event,
}));
console.log("response.data", response.data);
return {
data: [...newEntryList],
data: [...newEventsList],
pageParams: {
start_time: response.data.start_time,
end_time: response.data.end_time,
// timeinterval
start_time: response.data.start_time, // from old
end_time: response.data.end_time, // to new
// closes available transactions
next_event_time: response.data.next_event_time,
previous_event_time: response.data.previous_event_time,
// boundaries
include_start: response.data.include_start,
include_end: response.data.include_end,
},
};
};
const {
data: EntriesPages,
isFetchingMore,
isLoading,
fetchNextPage,
fetchPreviousPage,
hasNextPage,
canFetchMore,
fetchMore,
refetch,
} = useInfiniteQuery(["stream", { searchQuery }], getStream(searchQuery), {
refetchInterval: refreshRate,
...queryCacheProps,
getNextPageParam: (lastGroup) => {
console.log("lastGroup", lastGroup);
console.log("canFetchMore", canFetchMore);
console.log("fetchMore", fetchMore);
console.log("fetchNextPage", fetchNextPage);
console.log("fetchPreviousPage", fetchPreviousPage);
console.log("hasNextPage", hasNextPage);
return 1;
},
onSuccess: () => {},
enabled: !!enabled,
});
const {
return {
EntriesPages,
fetchMore,
isFetchingMore,
canFetchMore,
refetch,
isLoading,
};
data,
isLoading,
refetch,
} = useQuery(["stream", { searchQuery }], getStream(searchQuery,
start_time,
end_time,
include_start,
include_end ),
{
refetchInterval: refreshRate,
...queryCacheProps,
onSuccess: () => {},
enabled: !!enabled,
});
return {
EntriesPages: data,
isLoading,
refetch,
};
};
};
export default useJournalEntries;