Reversed event stream

pull/457/head
kompotkot 2021-11-24 11:07:48 +00:00
rodzic 45032157dd
commit 3daf597596
5 zmienionych plików z 55 dodań i 43 usunięć

Wyświetl plik

@ -1,13 +1,12 @@
""" """
Pydantic schemas for the Moonstream HTTP API Pydantic schemas for the Moonstream HTTP API
""" """
from datetime import datetime
from enum import Enum from enum import Enum
from typing import List, Optional, Dict, Any, Union from typing import Any, Dict, List, Optional, Union
from uuid import UUID from uuid import UUID
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from datetime import datetime
USER_ONBOARDING_STATE = "onboarding_state" USER_ONBOARDING_STATE = "onboarding_state"
@ -154,6 +153,7 @@ class StreamBoundary(BaseModel):
end_time: Optional[int] = None end_time: Optional[int] = None
include_start: bool = False include_start: bool = False
include_end: bool = False include_end: bool = False
reversed_time: bool = False
class Event(BaseModel): class Event(BaseModel):

Wyświetl plik

@ -24,17 +24,17 @@ if the order does not matter and you would rather emphasize speed. Only availabl
lists of events. (Default: True) lists of events. (Default: True)
""" """
from concurrent.futures import Future, ThreadPoolExecutor
import logging import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from bugout.app import Bugout from bugout.app import Bugout
from bugout.data import BugoutResource from bugout.data import BugoutResource
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from . import bugout, ethereum_blockchain
from .. import data from .. import data
from ..stream_queries import StreamQuery from ..stream_queries import StreamQuery
from . import bugout, ethereum_blockchain
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN) logger.setLevel(logging.WARN)
@ -107,9 +107,15 @@ def get_events(
else: else:
raise ReceivingEventsException(e) raise ReceivingEventsException(e)
stream_boundary = [boundary for boundary, _ in results.values()][0]
events = [event for _, event_list in results.values() for event in event_list] events = [event for _, event_list in results.values() for event in event_list]
if sort_events: if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True) # If stream_boundary time was reversed, so do not reverse by timestamp,
# it is already in correct oreder
events.sort(
key=lambda event: event.event_timestamp,
reverse=not stream_boundary.reversed_time,
)
return (stream_boundary, events) return (stream_boundary, events)

Wyświetl plik

@ -1,25 +1,18 @@
from dataclasses import dataclass, field
import logging import logging
from typing import cast, Dict, Any, List, Optional, Tuple from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout from bugout.app import Bugout
from bugout.data import BugoutResource from bugout.data import BugoutResource
from moonstreamdb.models import EthereumBlock, EthereumLabel, EthereumTransaction
from moonstreamdb.models import ( from sqlalchemy import and_, or_, text
EthereumBlock, from sqlalchemy.orm import Query, Session
EthereumTransaction,
EthereumLabel,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query
from sqlalchemy.sql.functions import user from sqlalchemy.sql.functions import user
from .. import data from .. import data
from ..stream_boundaries import validate_stream_boundary from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN) logger.setLevel(logging.WARN)
@ -59,7 +52,9 @@ def validate_subscription(
return True, errors return True, errors
def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None: def stream_boundary_validator(
stream_boundary: data.StreamBoundary,
) -> data.StreamBoundary:
""" """
Stream boundary validator for the ethereum_blockchain event provider. Stream boundary validator for the ethereum_blockchain event provider.
@ -68,9 +63,10 @@ def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None:
Raises an error for invalid stream boundaries, else returns None. Raises an error for invalid stream boundaries, else returns None.
""" """
valid_period_seconds = 2 * 60 * 60 valid_period_seconds = 2 * 60 * 60
validate_stream_boundary( _, stream_boundary = validate_stream_boundary(
stream_boundary, valid_period_seconds, raise_when_invalid=True stream_boundary, valid_period_seconds, raise_when_invalid=True
) )
return stream_boundary
@dataclass @dataclass
@ -298,7 +294,7 @@ def get_events(
If the query does not require any data from this provider, returns None. If the query does not require any data from this provider, returns None.
""" """
stream_boundary_validator(stream_boundary) stream_boundary = stream_boundary_validator(stream_boundary)
parsed_filters = parse_filters(query, user_subscriptions) parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None: if parsed_filters is None:

Wyświetl plik

@ -2,6 +2,7 @@
Utilities to work with stream boundaries. Utilities to work with stream boundaries.
""" """
import time import time
from typing import Tuple
from .data import StreamBoundary from .data import StreamBoundary
@ -16,7 +17,7 @@ def validate_stream_boundary(
stream_boundary: StreamBoundary, stream_boundary: StreamBoundary,
time_difference_seconds: int, time_difference_seconds: int,
raise_when_invalid: bool = False, raise_when_invalid: bool = False,
) -> bool: ) -> Tuple[bool, StreamBoundary]:
""" """
This function can be used by event providers to check if a stream boundary is valid according to their This function can be used by event providers to check if a stream boundary is valid according to their
requirements. requirements.
@ -33,6 +34,16 @@ def validate_stream_boundary(
f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}" f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}"
) )
else: else:
return False return False, stream_boundary
return True # If required reversed time stream of events
if start_time > end_time:
include_start = stream_boundary.include_start
include_end = stream_boundary.include_end
stream_boundary.start_time = end_time
stream_boundary.end_time = start_time
stream_boundary.include_start = include_end
stream_boundary.include_end = include_start
stream_boundary.reversed_time = True
return True, stream_boundary

Wyświetl plik

@ -3,8 +3,8 @@ Tests for stream boundary utilities.
""" """
import unittest import unittest
from .data import StreamBoundary
from . import stream_boundaries from . import stream_boundaries
from .data import StreamBoundary
class TestValidateStreamBoundary(unittest.TestCase): class TestValidateStreamBoundary(unittest.TestCase):
@ -12,45 +12,44 @@ class TestValidateStreamBoundary(unittest.TestCase):
stream_boundary = StreamBoundary( stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True start_time=1, end_time=5, include_start=True, include_end=True
) )
self.assertTrue( valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundaries.validate_stream_boundary( stream_boundary, 10, raise_when_invalid=False
stream_boundary, 10, raise_when_invalid=False
)
) )
self.assertTrue(valid)
def test_invalid_stream_boundary(self): def test_invalid_stream_boundary(self):
stream_boundary = StreamBoundary( stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True start_time=1, end_time=5, include_start=True, include_end=True
) )
self.assertFalse( valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundaries.validate_stream_boundary( stream_boundary, 1, raise_when_invalid=False
stream_boundary, 1, raise_when_invalid=False
)
) )
self.assertFalse(valid)
def test_invalid_stream_boundary_error(self): def test_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary( stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True start_time=1, end_time=5, include_start=True, include_end=True
) )
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary): with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary( valid
stream_boundary, 1, raise_when_invalid=True
)
def test_unconstrainted_invalid_stream_boundary(self): def test_unconstrainted_invalid_stream_boundary(self):
stream_boundary = StreamBoundary() stream_boundary = StreamBoundary()
self.assertFalse( valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundaries.validate_stream_boundary( stream_boundary, 1, raise_when_invalid=False
stream_boundary, 1, raise_when_invalid=False
)
) )
self.assertFalse(valid)
def test_unconstrained_invalid_stream_boundary_error(self): def test_unconstrained_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary() stream_boundary = StreamBoundary()
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary): with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary( valid
stream_boundary, 1, raise_when_invalid=True
)
if __name__ == "__main__": if __name__ == "__main__":