diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 6f128c64..9699257b 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field import logging import os -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import requests @@ -21,6 +21,11 @@ ENDPOINT_NOW = "/now" ENDPOINT_TOKEN = "/users/token" ENDPOINT_SUBSCRIPTIONS = "/subscriptions/" ENDPOINT_SUBSCRIPTION_TYPES = "/subscriptions/types" +ENDPOINT_STREAMS = "/streams/" +ENDPOINT_STREAMS_LATEST = "/streams/latest" +ENDPOINT_STREAMS_NEXT = "/streams/next" +ENDPOINT_STREAMS_PREVIOUS = "/streams/previous" + ENDPOINTS = [ ENDPOINT_PING, ENDPOINT_VERSION, @@ -28,6 +33,10 @@ ENDPOINTS = [ ENDPOINT_TOKEN, ENDPOINT_SUBSCRIPTIONS, ENDPOINT_SUBSCRIPTION_TYPES, + ENDPOINT_STREAMS, + ENDPOINT_STREAMS_LATEST, + ENDPOINT_STREAMS_NEXT, + ENDPOINT_STREAMS_PREVIOUS, ] @@ -263,6 +272,120 @@ class Moonstream: r.raise_for_status() return r.json() + def latest_events(self, q: str = "") -> List[Dict[str, Any]]: + """ + Returns the latest events in your stream. You can optionally provide a query parameter to + constrain the query to specific subscription types or to specific subscriptions. + + Arguments: + - q - Optional query (default is the empty string). The syntax to constrain to a particular + type of subscription is "type:". For example, to get the latest event from + your Ethereum transaction pool subscriptions, you would use "type:ethereum_txpool". + + Returns: A list of the latest events in your stream. + """ + self.requires_authorization() + query_params: Dict[str, str] = {} + if q: + query_params["q"] = q + r = self._session.get( + self.api.endpoints[ENDPOINT_STREAMS_LATEST], params=query_params + ) + r.raise_for_status() + return r.json() + + def next_event( + self, end_time: int, include_end: bool = True, q: str = "" + ) -> Optional[Dict[str, Any]]: + """ + Return the earliest event in your stream that occurred after the given end_time. + + Arguments: + - end_time - Time after which you want to retrieve the earliest event from your stream. + - include_end - If True, the result is the first event that occurred in your stream strictly + *after* the end time. If False, then you will get the first event that occurred in your + stream *on* or *after* the end time. + - q - Optional query to filter over your available subscriptions and subscription types. + + Returns: None if no event has occurred after the given end time, else returns a dictionary + representing that event. + """ + self.requires_authorization() + query_params: Dict[str, Any] = { + "end_time": end_time, + "include_end": include_end, + } + if q: + query_params["q"] = q + r = self._session.get( + self.api.endpoints[ENDPOINT_STREAMS_NEXT], params=query_params + ) + r.raise_for_status() + return r.json() + + def previous_event( + self, start_time: int, include_start: bool = True, q: str = "" + ) -> Optional[Dict[str, Any]]: + """ + Return the latest event in your stream that occurred before the given start_time. + + Arguments: + - start_time - Time before which you want to retrieve the latest event from your stream. + - include_start - If True, the result is the last event that occurred in your stream strictly + *before* the start time. If False, then you will get the last event that occurred in your + stream *on* or *before* the start time. + - q - Optional query to filter over your available subscriptions and subscription types. + + Returns: None if no event has occurred before the given start time, else returns a dictionary + representing that event. + """ + self.requires_authorization() + query_params: Dict[str, Any] = { + "start_time": start_time, + "include_start": include_start, + } + if q: + query_params["q"] = q + r = self._session.get( + self.api.endpoints[ENDPOINT_STREAMS_PREVIOUS], params=query_params + ) + r.raise_for_status() + return r.json() + + def events( + self, + start_time: int, + include_start: bool, + end_time: int, + include_end: bool, + q: str = "", + ) -> Dict[str, Any]: + """ + Return all events in your stream that occurred between the given start and end times. + + Arguments: + - start_time - Time after which you want to query your stream. + - include_start - Whether or not events that occurred exactly at the start_time should be included in the results. + - end_time - Time before which you want to query your stream. + - include_end - Whether or not events that occurred exactly at the end_time should be included in the results. + - q - Optional query to filter over your available subscriptions and subscription types. + + Returns: A dictionary representing the results of your query. + """ + self.requires_authorization() + query_params: Dict[str, Any] = { + "start_time": start_time, + "include_start": include_start, + "end_time": end_time, + "include_end": include_end, + } + if q: + query_params["q"] = q + + r = self._session.get(self.api.endpoints[ENDPOINT_STREAMS], params=query_params) + r.raise_for_status() + return r.json() + def client_from_env() -> Moonstream: """ diff --git a/clients/python/moonstream/test_client.py b/clients/python/moonstream/test_client.py index e2b9b5cd..e0c00cf6 100644 --- a/clients/python/moonstream/test_client.py +++ b/clients/python/moonstream/test_client.py @@ -99,9 +99,7 @@ class TestMoonstreamClientFromEnv(unittest.TestCase): self.assertIsNone(m.requires_authorization()) authorization_header = m._session.headers["Authorization"] - self.assertTrue(authorization_header.startswith("Bearer ")) - access_token = authorization_header[len("Bearer ") :] - self.assertEqual(access_token, self.moonstream_access_token) + self.assertEqual(authorization_header, f"Bearer {self.moonstream_access_token}") class TestMoonstreamEndpoints(unittest.TestCase): @@ -120,6 +118,10 @@ class TestMoonstreamEndpoints(unittest.TestCase): client.ENDPOINT_TOKEN: f"{self.normalized_url}{client.ENDPOINT_TOKEN}", client.ENDPOINT_SUBSCRIPTION_TYPES: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTION_TYPES}", client.ENDPOINT_SUBSCRIPTIONS: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTIONS}", + client.ENDPOINT_STREAMS: f"{self.normalized_url}{client.ENDPOINT_STREAMS}", + client.ENDPOINT_STREAMS_LATEST: f"{self.normalized_url}{client.ENDPOINT_STREAMS_LATEST}", + client.ENDPOINT_STREAMS_NEXT: f"{self.normalized_url}{client.ENDPOINT_STREAMS_NEXT}", + client.ENDPOINT_STREAMS_PREVIOUS: f"{self.normalized_url}{client.ENDPOINT_STREAMS_PREVIOUS}", }, )