From 36fb761e12a00ca0d522920365e80946d54f3380 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Sat, 21 Aug 2021 11:26:52 -0700 Subject: [PATCH] Added stream boundary validator (and tests) --- backend/moonstream/stream_boundaries.py | 38 +++++++++++++ backend/moonstream/test_stream_boundaries.py | 57 ++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 backend/moonstream/stream_boundaries.py create mode 100644 backend/moonstream/test_stream_boundaries.py diff --git a/backend/moonstream/stream_boundaries.py b/backend/moonstream/stream_boundaries.py new file mode 100644 index 00000000..b285fd01 --- /dev/null +++ b/backend/moonstream/stream_boundaries.py @@ -0,0 +1,38 @@ +""" +Utilities to work with stream boundaries. +""" +import time + +from .data import StreamBoundary + + +class InvalidStreamBoundary(Exception): + """ + Raised if a StreamBoundary object does not satisfy required constraints. + """ + + +def validate_stream_boundary( + stream_boundary: StreamBoundary, + time_difference_seconds: int, + raise_when_invalid: bool = False, +) -> bool: + """ + This function can be used by event providers to check if a stream boundary is valid according to their + requirements. + """ + start_time = stream_boundary.start_time + max_end_time = start_time + time_difference_seconds + end_time = stream_boundary.end_time + if end_time is None: + end_time = int(time.time()) + + if end_time > max_end_time: + if raise_when_invalid: + raise InvalidStreamBoundary( + f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}" + ) + else: + return False + + return True diff --git a/backend/moonstream/test_stream_boundaries.py b/backend/moonstream/test_stream_boundaries.py new file mode 100644 index 00000000..44a2ca09 --- /dev/null +++ b/backend/moonstream/test_stream_boundaries.py @@ -0,0 +1,57 @@ +""" +Tests for stream boundary utilities. +""" +import unittest + +from .data import StreamBoundary +from . import stream_boundaries + + +class TestValidateStreamBoundary(unittest.TestCase): + def test_valid_stream_boundary(self): + stream_boundary = StreamBoundary( + start_time=1, end_time=5, include_start=True, include_end=True + ) + self.assertTrue( + stream_boundaries.validate_stream_boundary( + stream_boundary, 10, raise_when_invalid=False + ) + ) + + def test_invalid_stream_boundary(self): + stream_boundary = StreamBoundary( + start_time=1, end_time=5, include_start=True, include_end=True + ) + self.assertFalse( + stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=False + ) + ) + + def test_invalid_stream_boundary_error(self): + stream_boundary = StreamBoundary( + start_time=1, end_time=5, include_start=True, include_end=True + ) + with self.assertRaises(stream_boundaries.InvalidStreamBoundary): + stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=True + ) + + def test_unconstrainted_invalid_stream_boundary(self): + stream_boundary = StreamBoundary() + self.assertFalse( + stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=False + ) + ) + + def test_unconstrained_invalid_stream_boundary_error(self): + stream_boundary = StreamBoundary() + with self.assertRaises(stream_boundaries.InvalidStreamBoundary): + stream_boundaries.validate_stream_boundary( + stream_boundary, 1, raise_when_invalid=True + ) + + +if __name__ == "__main__": + unittest.main()