Added stream boundary validator

(and tests)
pull/105/head
Neeraj Kashyap 2021-08-21 11:26:52 -07:00
rodzic 2886aed32e
commit 36fb761e12
2 zmienionych plików z 95 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,38 @@
"""
Utilities to work with stream boundaries.
"""
import time
from .data import StreamBoundary
class InvalidStreamBoundary(Exception):
"""
Raised if a StreamBoundary object does not satisfy required constraints.
"""
def validate_stream_boundary(
stream_boundary: StreamBoundary,
time_difference_seconds: int,
raise_when_invalid: bool = False,
) -> bool:
"""
This function can be used by event providers to check if a stream boundary is valid according to their
requirements.
"""
start_time = stream_boundary.start_time
max_end_time = start_time + time_difference_seconds
end_time = stream_boundary.end_time
if end_time is None:
end_time = int(time.time())
if end_time > max_end_time:
if raise_when_invalid:
raise InvalidStreamBoundary(
f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}"
)
else:
return False
return True

Wyświetl plik

@ -0,0 +1,57 @@
"""
Tests for stream boundary utilities.
"""
import unittest
from .data import StreamBoundary
from . import stream_boundaries
class TestValidateStreamBoundary(unittest.TestCase):
def test_valid_stream_boundary(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertTrue(
stream_boundaries.validate_stream_boundary(
stream_boundary, 10, raise_when_invalid=False
)
)
def test_invalid_stream_boundary(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
)
def test_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
def test_unconstrainted_invalid_stream_boundary(self):
stream_boundary = StreamBoundary()
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
)
def test_unconstrained_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary()
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
if __name__ == "__main__":
unittest.main()