From 43535cae50edb1e23a8d711f1ac1abfc1b896c3e Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 26 Oct 2022 11:29:25 +0000 Subject: [PATCH 1/7] New python client query api based --- clients/python/.gitignore | 5 + clients/python/moonstream/client.py | 598 ++++++----------------- clients/python/moonstream/data.py | 47 ++ clients/python/moonstream/exceptions.py | 24 + clients/python/moonstream/test_client.py | 168 ++----- clients/python/moonstream/version.py | 2 +- clients/python/sample.env | 3 + clients/python/setup.py | 2 +- 8 files changed, 268 insertions(+), 581 deletions(-) create mode 100644 clients/python/moonstream/data.py create mode 100644 clients/python/moonstream/exceptions.py create mode 100644 clients/python/sample.env diff --git a/clients/python/.gitignore b/clients/python/.gitignore index 04290913..241c9e30 100644 --- a/clients/python/.gitignore +++ b/clients/python/.gitignore @@ -143,5 +143,10 @@ cython_debug/ # End of https://www.toptal.com/developers/gitignore/api/python +# Custom .moonstream-py/ .venv/ +.secrets/ +prod.env +dev.env +test.env diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 827a6a49..abc7b62d 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -1,41 +1,28 @@ -import logging -import os -import time -from dataclasses import dataclass, field -from typing import Any, Dict, Generator, List, Optional, Tuple +import uuid +from typing import Any, Dict, Union import requests -from .version import MOONSTREAM_CLIENT_VERSION - -logger = logging.getLogger(__name__) -log_level = logging.INFO -if os.environ.get("DEBUG", "").lower() in ["true", "1"]: - log_level = logging.DEBUG -logger.setLevel(log_level) +from .data import ( + APISpec, + AuthType, + Method, + MoonstreamQueries, + MoonstreamQuery, + MoonstreamQueryResultUrl, +) +from .exceptions import MoonstreamResponseException, MoonstreamUnexpectedResponse ENDPOINT_PING = "/ping" ENDPOINT_VERSION = "/version" ENDPOINT_NOW = "/now" -ENDPOINT_TOKEN = "/users/token" -ENDPOINT_SUBSCRIPTIONS = "/subscriptions/" -ENDPOINT_SUBSCRIPTION_TYPES = "/subscriptions/types" -ENDPOINT_STREAMS = "/streams/" -ENDPOINT_STREAMS_LATEST = "/streams/latest" -ENDPOINT_STREAMS_NEXT = "/streams/next" -ENDPOINT_STREAMS_PREVIOUS = "/streams/previous" +ENDPOINT_QUERIES = "/queries" ENDPOINTS = [ ENDPOINT_PING, ENDPOINT_VERSION, ENDPOINT_NOW, - ENDPOINT_TOKEN, - ENDPOINT_SUBSCRIPTIONS, - ENDPOINT_SUBSCRIPTION_TYPES, - ENDPOINT_STREAMS, - ENDPOINT_STREAMS_LATEST, - ENDPOINT_STREAMS_NEXT, - ENDPOINT_STREAMS_PREVIOUS, + ENDPOINT_QUERIES, ] @@ -43,473 +30,178 @@ def moonstream_endpoints(url: str) -> Dict[str, str]: """ Creates a dictionary of Moonstream API endpoints at the given Moonstream API URL. """ - url_with_protocol = url - if not ( - url_with_protocol.startswith("http://") - or url_with_protocol.startswith("https://") - ): - url_with_protocol = f"http://{url_with_protocol}" + if not (url.startswith("http://") or url.startswith("https://")): + url = f"http://{url}" - normalized_url = url_with_protocol.rstrip("/") + normalized_url = url.rstrip("/") return {endpoint: f"{normalized_url}{endpoint}" for endpoint in ENDPOINTS} -class UnexpectedResponse(Exception): - """ - Raised when a server response cannot be parsed into the appropriate/expected Python structure. - """ - - -class Unauthenticated(Exception): - """ - Raised when a user tries to make a request that needs to be authenticated by they are not authenticated. - """ - - -@dataclass(frozen=True) -class APISpec: - url: str - endpoints: Dict[str, str] - - class Moonstream: """ A Moonstream client configured to communicate with a given Moonstream API server. """ def __init__( - self, - url: str = "https://api.moonstream.to", - timeout: Optional[float] = None, + self, moonstream_api_url: str = "https://api.moonstream.to", timeout: float = 1 ): """ Initializes a Moonstream API client. - Arguments: - url - Moonstream API URL. By default this points to the production Moonstream API at https://api.moonstream.to, - but you can replace it with the URL of any other Moonstream API instance. - timeout - Timeout (in seconds) for Moonstream API requests. Default is None, which means that - Moonstream API requests will never time out. - - Returns: A Moonstream client. + Arguments: + url - Moonstream API URL. By default this points to the production Moonstream API at https://api.moonstream.to, + but you can replace it with the URL of any other Moonstream API instance. """ - endpoints = moonstream_endpoints(url) - self.api = APISpec(url=url, endpoints=endpoints) + endpoints = moonstream_endpoints(moonstream_api_url) + self.api = APISpec(url=moonstream_api_url, endpoints=endpoints) self.timeout = timeout - self._session = requests.Session() - self._session.headers.update( - { - "User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})" - } - ) + + def _call(self, method: Method, url: str, **kwargs): + try: + response = requests.request(method.value, url=url, **kwargs) + response.raise_for_status() + except requests.exceptions.RequestException as err: + r = err.response + if not err.response: + # Connection errors, timeouts, etc... + raise MoonstreamResponseException( + "Network error", status_code=599, detail=str(err) + ) + if r.headers.get("Content-Type") == "application/json": + exception_detail = r.json()["detail"] + else: + exception_detail = r.text + raise MoonstreamResponseException( + "An exception occurred at Bugout API side", + status_code=r.status_code, + detail=exception_detail, + ) + except Exception as e: + raise MoonstreamUnexpectedResponse(str(e)) + return response.json() def ping(self) -> Dict[str, Any]: """ Checks that you have a connection to the Moonstream API. """ - r = self._session.get(self.api.endpoints[ENDPOINT_PING]) - r.raise_for_status() - return r.json() + result = self._call(method=Method.GET, url=self.api.endpoints[ENDPOINT_PING]) + return result def version(self) -> Dict[str, Any]: """ Gets the Moonstream API version information from the server. """ - r = self._session.get(self.api.endpoints[ENDPOINT_VERSION]) - r.raise_for_status() - return r.json() + result = self._call(method=Method.GET, url=self.api.endpoints[ENDPOINT_VERSION]) + return result - def server_time(self) -> float: - """ - Gets the current time (as microseconds since the Unix epoch) on the server. - """ - r = self._session.get(self.api.endpoints[ENDPOINT_NOW]) - r.raise_for_status() - result = r.json() - raw_epoch_time = result.get("epoch_time") - if raw_epoch_time is None: - raise UnexpectedResponse( - f'Server response does not contain "epoch_time": {result}' - ) - - try: - epoch_time = float(raw_epoch_time) - except: - raise UnexpectedResponse( - f"Could not process epoch time as a float: {raw_epoch_time}" - ) - - return epoch_time - - def authorize(self, access_token: str) -> None: - if not access_token: - logger.warning("Setting authorization header to empty token.") - self._session.headers.update({"Authorization": f"Bearer {access_token}"}) - - def requires_authorization(self): - if self._session.headers.get("Authorization") is None: - raise Unauthenticated( - 'This method requires that you authenticate to the API, either by calling the "authorize" method with an API token or by calling the "login" method.' - ) - - def login(self, username: str, password: Optional[str] = None) -> str: - """ - Authorizes this client to act as the given user when communicating with the Moonstream API. - - To register an account on the production Moonstream API, go to https://moonstream.to. - - Arguments: - username - Username of the user to authenticate as. - password - Optional password for the user. If this is not provided, you will be prompted for - the password. - """ - if password is None: - password = input(f"Moonstream password for {username}: ") - - r = self._session.post( - self.api.endpoints[ENDPOINT_TOKEN], - data={"username": username, "password": password}, - ) - r.raise_for_status() - - token = r.json() - self.authorize(token["id"]) - return token - - def logout(self) -> None: - """ - Logs the current user out of the Moonstream client. - """ - self._session.delete(self.api.endpoints[ENDPOINT_TOKEN]) - self._session.headers.pop("Authorization") - - def subscription_types(self) -> Dict[str, Any]: - """ - Gets the currently available subscription types on the Moonstream API. - """ - r = self._session.get(self.api.endpoints[ENDPOINT_SUBSCRIPTION_TYPES]) - r.raise_for_status() - return r.json() - - def list_subscriptions(self) -> Dict[str, Any]: - """ - Gets the currently authorized user's subscriptions from the API server. - """ - self.requires_authorization() - r = self._session.get(self.api.endpoints[ENDPOINT_SUBSCRIPTIONS]) - r.raise_for_status() - return r.json() - - def create_subscription( - self, subscription_type: str, label: str, color: str, specifier: str = "" - ) -> Dict[str, Any]: - """ - Creates a subscription. - - Arguments: - subscription_type - The type of subscription you would like to create. To see the available subscription - types, call the "subscription_types" method on this Moonstream client. This argument must be - the "id" if the subscription type you want. - label - A label for the subscription. This will identify the subscription to you in your stream. - color - A hexadecimal color to associate with the subscription. - specifier - A specifier for the subscription, which must correspond to one of the choices in the - subscription type. This is optional because some subscription types do not require a specifier. - - Returns: The subscription resource that was created on the backend. - """ - self.requires_authorization() - r = self._session.post( - self.api.endpoints[ENDPOINT_SUBSCRIPTIONS], - data={ - "subscription_type_id": subscription_type, - "label": label, - "color": color, - "address": specifier, - }, - ) - r.raise_for_status() - return r.json() - - def delete_subscription(self, id: str) -> Dict[str, Any]: - """ - Delete a subscription by ID. - - Arguments: - id - ID of the subscription to delete. - - Returns: The subscription resource that was deleted. - """ - self.requires_authorization() - r = self._session.delete(f"{self.api.endpoints[ENDPOINT_SUBSCRIPTIONS]}{id}") - r.raise_for_status() - return r.json() - - def update_subscription( - self, id: str, label: Optional[str] = None, color: Optional[str] = None - ) -> Dict[str, Any]: - """ - Update a subscription label or color. - - Arguments: - label - New label for subscription (optional). - color - New color for subscription (optional). - - Returns - If neither label or color are specified, raises a ValueError. Otherwise PUTs the updated - information to the server and returns the updated subscription resource. - """ - if label is None and color is None: - raise ValueError( - "At least one of the arguments to this method should not be None." - ) - self.requires_authorization() - data = {} - if label is not None: - data["label"] = label - if color is not None: - data["color"] = color - - r = self._session.put( - f"{self.api.endpoints[ENDPOINT_SUBSCRIPTIONS]}{id}", data=data - ) - r.raise_for_status() - return r.json() - - def latest_events(self, q: str = "") -> List[Dict[str, Any]]: - """ - Returns the latest events in your stream. You can optionally provide a query parameter to - constrain the query to specific subscription types or to specific subscriptions. - - Arguments: - - q - Optional query (default is the empty string). The syntax to constrain to a particular - type of subscription is "type:". For example, to get the latest event from - your Ethereum transaction pool subscriptions, you would use "type:ethereum_txpool". - - Returns: A list of the latest events in your stream. - """ - self.requires_authorization() - query_params: Dict[str, str] = {} - if q: - query_params["q"] = q - r = self._session.get( - self.api.endpoints[ENDPOINT_STREAMS_LATEST], params=query_params - ) - r.raise_for_status() - return r.json() - - def next_event( - self, end_time: int, include_end: bool = True, q: str = "" - ) -> Optional[Dict[str, Any]]: - """ - Return the earliest event in your stream that occurred after the given end_time. - - Arguments: - - end_time - Time after which you want to retrieve the earliest event from your stream. - - include_end - If True, the result is the first event that occurred in your stream strictly - *after* the end time. If False, then you will get the first event that occurred in your - stream *on* or *after* the end time. - - q - Optional query to filter over your available subscriptions and subscription types. - - Returns: None if no event has occurred after the given end time, else returns a dictionary - representing that event. - """ - self.requires_authorization() - query_params: Dict[str, Any] = { - "end_time": end_time, - "include_end": include_end, - } - if q: - query_params["q"] = q - r = self._session.get( - self.api.endpoints[ENDPOINT_STREAMS_NEXT], params=query_params - ) - r.raise_for_status() - return r.json() - - def previous_event( - self, start_time: int, include_start: bool = True, q: str = "" - ) -> Optional[Dict[str, Any]]: - """ - Return the latest event in your stream that occurred before the given start_time. - - Arguments: - - start_time - Time before which you want to retrieve the latest event from your stream. - - include_start - If True, the result is the last event that occurred in your stream strictly - *before* the start time. If False, then you will get the last event that occurred in your - stream *on* or *before* the start time. - - q - Optional query to filter over your available subscriptions and subscription types. - - Returns: None if no event has occurred before the given start time, else returns a dictionary - representing that event. - """ - self.requires_authorization() - query_params: Dict[str, Any] = { - "start_time": start_time, - "include_start": include_start, - } - if q: - query_params["q"] = q - r = self._session.get( - self.api.endpoints[ENDPOINT_STREAMS_PREVIOUS], params=query_params - ) - r.raise_for_status() - return r.json() - - def events( + def create_query( self, - start_time: int, - end_time: int, - include_start: bool = False, - include_end: bool = False, - q: str = "", - ) -> Dict[str, Any]: + token: Union[str, uuid.UUID], + query: str, + name: str, + public: bool = False, + auth_type: AuthType = AuthType.bearer, + ) -> MoonstreamQuery: """ - Return all events in your stream that occurred between the given start and end times. - - Arguments: - - start_time - Time after which you want to query your stream. - - include_start - Whether or not events that occurred exactly at the start_time should be included in the results. - - end_time - Time before which you want to query your stream. - - include_end - Whether or not events that occurred exactly at the end_time should be included in the results. - - q - Optional query to filter over your available subscriptions and subscription types. - - Returns: A dictionary representing the results of your query. + Creates new query. """ - self.requires_authorization() - query_params: Dict[str, Any] = { - "start_time": start_time, - "include_start": include_start, - "end_time": end_time, - "include_end": include_end, + json = { + "query": query, + "name": name, + "public": public, } - if q: - query_params["q"] = q + headers = { + "Authorization": f"{auth_type.value} {token}", + } + response = self._call( + method=Method.POST, + url=f"{self.api.endpoints[ENDPOINT_QUERIES]}", + headers=headers, + json=json, + ) - r = self._session.get(self.api.endpoints[ENDPOINT_STREAMS], params=query_params) - r.raise_for_status() - return r.json() + return MoonstreamQuery( + id=response["id"], + journal_url=response["journal_url"], + name=response["title"], + query=response["content"], + tags=response["tags"], + created_at=response["created_at"], + updated_at=response["updated_at"], + ) - def create_stream( + def list_queries( self, - start_time: int, - end_time: Optional[int] = None, - q: str = "", - ) -> Generator[Dict[str, Any], None, None]: + token: Union[str, uuid.UUID], + auth_type: AuthType = AuthType.bearer, + ) -> MoonstreamQueries: """ - Return a stream of event. Event packs will be generated with 1 hour time range. - - Arguments: - - start_time - One of time border. - - end_time - Time until the end of stream, if set to None stream will be going forward endlessly. - - q - Optional query to filter over your available subscriptions and subscription types. - - Returns: A dictionary stream representing the results of your query. + Returns list of all queries available to user. """ - # TODO(kompotkot): Add tests - shift_two_hours = 2 * 60 * 60 # 2 hours - shift_half_hour = 1 * 30 * 30 # 30 min + headers = { + "Authorization": f"{auth_type.value} {token}", + } + response = self._call( + method=Method.GET, + url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/list", + headers=headers, + ) - def fetch_events( - modified_start_time: int, modified_end_time: int - ) -> Generator[Tuple[Dict[str, Any], bool], None, None]: - # If it is going from top to bottom in history, - # then time_range will be reversed - reversed_time = False - if modified_start_time > modified_end_time: - reversed_time = True - max_boundary = max(modified_start_time, modified_end_time) - min_boundary = min(modified_start_time, modified_end_time) - - time_range_list = [] - # 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}] - if max_boundary - min_boundary > shift_half_hour: - for i in range(min_boundary, max_boundary, shift_half_hour): - end_i = ( - i + shift_half_hour - 1 - if i + shift_half_hour <= max_boundary - else max_boundary - ) - time_range_list.append({"start_time": i, "end_time": end_i}) - else: - time_range_list.append( - {"start_time": min_boundary, "end_time": max_boundary} + return MoonstreamQueries( + queries=[ + MoonstreamQuery( + id=query["entry_id"], + name=query["name"], + query_type=query["type"], + user=query["user"], + user_id=query["user_id"], ) - if reversed_time: - time_range_list.reverse() + for query in response + ] + ) - for time_range in time_range_list: - r_json = self.events( - start_time=time_range["start_time"], - end_time=time_range["end_time"], - include_start=True, - include_end=True, - q=q, - ) + def exec_query( + self, + token: Union[str, uuid.UUID], + name: str, + params: Dict[str, Any] = {}, + auth_type: AuthType = AuthType.bearer, + ) -> MoonstreamQueryResultUrl: + """ + Executes queries and upload data to external storage. + """ + headers = { + "Authorization": f"{auth_type.value} {token}", + } + json = { + "params": params, + } + response = self._call( + method=Method.POST, + url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}/update_data", + headers=headers, + json=json, + ) - yield r_json, reversed_time + return MoonstreamQueryResultUrl(url=response["url"]) - time_range_list = time_range_list[:] + def delete_query( + self, + token: Union[str, uuid.UUID], + name: str, + auth_type: AuthType = AuthType.bearer, + ) -> uuid.UUID: + """ + Deletes query specified by name. + """ + headers = { + "Authorization": f"{auth_type.value} {token}", + } + response = self._call( + method=Method.DELETE, + url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}", + headers=headers, + ) - if end_time is None: - float_start_time = start_time - - while True: - end_time = int(self.server_time()) - # If time range is greater then 2 hours, - # shift float_start time close to end_time to prevent stream block - if end_time - float_start_time > shift_two_hours: - float_start_time = shift_two_hours - for r_json, reversed_time in fetch_events(float_start_time, end_time): - - yield r_json - - events = r_json.get("events", []) - if len(events) > 0: - # Updating float_start_time after first iteration to last event time - if reversed_time: - float_start_time = events[-1].get("event_timestamp") - 1 - else: - float_start_time = events[0].get("event_timestamp") + 1 - - else: - # If there are no events in response, wait - # until new will be added - time.sleep(5) - else: - for r_json, reversed_time in fetch_events(start_time, end_time): - yield r_json - - -def client_from_env() -> Moonstream: - """ - Produces a Moonstream client instantiated using the following environment variables: - - MOONSTREAM_API_URL: Specifies the url parameter on the Moonstream client - - MOONSTREAM_TIMEOUT_SECONDS: Specifies the request timeout - - MOONSTREAM_ACCESS_TOKEN: If this environment variable is defined, the client sets this token as - the authorization header for all Moonstream API requests. - """ - kwargs: Dict[str, Any] = {} - - url = os.environ.get("MOONSTREAM_API_URL") - if url is not None: - kwargs["url"] = url - - raw_timeout = os.environ.get("MOONSTREAM_TIMEOUT_SECONDS") - timeout: Optional[float] = None - if raw_timeout is not None: - try: - timeout = float(raw_timeout) - except: - raise ValueError( - f"Could not convert MOONSTREAM_TIMEOUT_SECONDS ({raw_timeout}) to float." - ) - - kwargs["timeout"] = timeout - - moonstream_client = Moonstream(**kwargs) - - access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN") - if access_token is not None: - moonstream_client.authorize(access_token) - - return moonstream_client + return response["id"] diff --git a/clients/python/moonstream/data.py b/clients/python/moonstream/data.py new file mode 100644 index 00000000..50ca8c5a --- /dev/null +++ b/clients/python/moonstream/data.py @@ -0,0 +1,47 @@ +import uuid +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +@dataclass(frozen=True) +class APISpec: + url: str + endpoints: Dict[str, str] + + +class AuthType(Enum): + bearer = "Bearer" + web3 = "Web3" + + +class Method(Enum): + DELETE = "delete" + GET = "get" + POST = "post" + PUT = "put" + + +@dataclass(frozen=True) +class MoonstreamQuery: + id: uuid.UUID + name: str + journal_url: Optional[str] = None + query: Optional[str] = None + tags: Optional[List[str]] = None + user: Optional[str] = None + user_id: Optional[uuid.UUID] = None + query_type: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +@dataclass(frozen=True) +class MoonstreamQueries: + queries: List[MoonstreamQuery] + + +@dataclass(frozen=True) +class MoonstreamQueryResultUrl: + url: str diff --git a/clients/python/moonstream/exceptions.py b/clients/python/moonstream/exceptions.py new file mode 100644 index 00000000..9e7910f7 --- /dev/null +++ b/clients/python/moonstream/exceptions.py @@ -0,0 +1,24 @@ +from typing import Any, Optional + + +class MoonstreamResponseException(Exception): + """ + Raised when Moonstream server response with error. + """ + + def __init__( + self, + message, + status_code: int, + detail: Optional[Any] = None, + ) -> None: + super().__init__(message) + self.status_code = status_code + if detail is not None: + self.detail = detail + + +class MoonstreamUnexpectedResponse(Exception): + """ + Raised when Moonstream server response is unexpected (e.g. unparseable). + """ diff --git a/clients/python/moonstream/test_client.py b/clients/python/moonstream/test_client.py index e0c00cf6..1c1f481f 100644 --- a/clients/python/moonstream/test_client.py +++ b/clients/python/moonstream/test_client.py @@ -1,138 +1,54 @@ -from dataclasses import FrozenInstanceError import os import unittest from . import client -class TestMoonstreamClient(unittest.TestCase): - def test_client_init(self): - m = client.Moonstream() - self.assertEqual(m.api.url, "https://api.moonstream.to") - self.assertIsNone(m.timeout) - self.assertGreater(len(m.api.endpoints), 0) - - def test_client_init_with_timeout(self): - timeout = 7 - m = client.Moonstream(timeout=timeout) - self.assertEqual(m.api.url, "https://api.moonstream.to") - self.assertEqual(m.timeout, timeout) - self.assertGreater(len(m.api.endpoints), 0) - - def test_client_with_custom_url_and_timeout(self): - timeout = 9 - url = "https://my.custom.api.url" - m = client.Moonstream(url=url, timeout=timeout) - self.assertEqual(m.api.url, url) - self.assertEqual(m.timeout, timeout) - self.assertGreater(len(m.api.endpoints), 0) - - def test_client_with_custom_messy_url_and_timeout(self): - timeout = 3.5 - url = "https://my.custom.api.url/" - m = client.Moonstream(url=url, timeout=timeout) - self.assertEqual(m.api.url, url) - self.assertEqual(m.timeout, timeout) - self.assertGreater(len(m.api.endpoints), 0) - - def test_client_with_custom_messy_url_no_protocol_and_timeout(self): - timeout = 5.5 - url = "my.custom.api.url/" - m = client.Moonstream(url=url, timeout=timeout) - self.assertEqual(m.api.url, url) - self.assertEqual(m.timeout, timeout) - self.assertGreater(len(m.api.endpoints), 0) - - def test_immutable_api_url(self): - m = client.Moonstream() - with self.assertRaises(FrozenInstanceError): - m.api.url = "lol" - - def test_immutable_api_endpoints(self): - m = client.Moonstream() - with self.assertRaises(FrozenInstanceError): - m.api.endpoints = {} - - def test_mutable_timeout(self): - original_timeout = 5.0 - updated_timeout = 10.5 - m = client.Moonstream(timeout=original_timeout) - self.assertEqual(m.timeout, original_timeout) - m.timeout = updated_timeout - self.assertEqual(m.timeout, updated_timeout) - - -class TestMoonstreamClientFromEnv(unittest.TestCase): +class TestMoonstreamCalls(unittest.TestCase): def setUp(self): - self.old_moonstream_api_url = os.environ.get("MOONSTREAM_API_URL") - self.old_moonstream_timeout_seconds = os.environ.get( - "MOONSTREAM_TIMEOUT_SECONDS" + url = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") + self.token = os.environ.get("MOONSTREAM_ACCESS_TOKEN") + if self.token is None: + raise Exception("MOONSTREAM_ACCESS_TOKEN should be specified") + self.m = client.Moonstream(moonstream_api_url=url) + + queries = self.m.list_queries(self.token) + for query in queries.queries: + if query.name.startswith("test_query_name"): + self.m.delete_query(self.token, query.name) + + def test_ping(self): + response = self.m.ping() + self.assertEqual(response["status"], "ok") + + def test_create_query(self): + query = "SELECT count(*) FROM polygon_blocks" + name = "test-query-name-1" + response = self.m.create_query(self.token, query, name) + self.assertEqual(f"Query:{name.replace('-', '_')}", response.name) + + def test_list_queries(self): + query = ( + "SELECT hash,block_number FROM polygon_blocks WHERE block_number = 21175765" ) - self.old_moonstream_access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN") + name = "test-query-name-2" + response_1 = self.m.create_query(self.token, query, name) + self.assertEqual(f"Query:{name.replace('-', '_')}", response_1.name) - self.moonstream_api_url = "https://custom.example.com" - self.moonstream_timeout_seconds = 15.333333 - self.moonstream_access_token = "1d431ca4-af9b-4c3a-b7b9-3cc79f3b0900" + response_2 = self.m.list_queries(self.token) + self.assertGreaterEqual(len(response_2.queries), 1) - os.environ["MOONSTREAM_API_URL"] = self.moonstream_api_url - os.environ["MOONSTREAM_TIMEOUT_SECONDS"] = str(self.moonstream_timeout_seconds) - os.environ["MOONSTREAM_ACCESS_TOKEN"] = self.moonstream_access_token + def test_delete_query(self): + query = "SELECT 1" + name = "test-query-name-0" + response_1 = self.m.create_query(self.token, query, name) + self.assertEqual(f"Query:{name.replace('-', '_')}", response_1.name) + + response_2 = self.m.delete_query(self.token, name.replace("-", "_")) + self.assertEqual(response_1.id, response_2) def tearDown(self) -> None: - del os.environ["MOONSTREAM_API_URL"] - del os.environ["MOONSTREAM_TIMEOUT_SECONDS"] - del os.environ["MOONSTREAM_ACCESS_TOKEN"] - - if self.old_moonstream_api_url is not None: - os.environ["MOONSTREAM_API_URL"] = self.old_moonstream_api_url - if self.old_moonstream_timeout_seconds is not None: - os.environ[ - "MOONSTREAM_TIMEOUT_SECONDS" - ] = self.old_moonstream_timeout_seconds - if self.old_moonstream_access_token is not None: - os.environ["MOONSTREAM_ACCESS_TOKEN"] = self.old_moonstream_access_token - - def test_client_from_env(self): - m = client.client_from_env() - self.assertEqual(m.api.url, self.moonstream_api_url) - self.assertEqual(m.timeout, self.moonstream_timeout_seconds) - self.assertIsNone(m.requires_authorization()) - - authorization_header = m._session.headers["Authorization"] - self.assertEqual(authorization_header, f"Bearer {self.moonstream_access_token}") - - -class TestMoonstreamEndpoints(unittest.TestCase): - def setUp(self): - self.url = "https://api.moonstream.to" - self.normalized_url = "https://api.moonstream.to" - - def test_moonstream_endpoints(self): - endpoints = client.moonstream_endpoints(self.url) - self.assertDictEqual( - endpoints, - { - client.ENDPOINT_PING: f"{self.normalized_url}{client.ENDPOINT_PING}", - client.ENDPOINT_VERSION: f"{self.normalized_url}{client.ENDPOINT_VERSION}", - client.ENDPOINT_NOW: f"{self.normalized_url}{client.ENDPOINT_NOW}", - client.ENDPOINT_TOKEN: f"{self.normalized_url}{client.ENDPOINT_TOKEN}", - client.ENDPOINT_SUBSCRIPTION_TYPES: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTION_TYPES}", - client.ENDPOINT_SUBSCRIPTIONS: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTIONS}", - client.ENDPOINT_STREAMS: f"{self.normalized_url}{client.ENDPOINT_STREAMS}", - client.ENDPOINT_STREAMS_LATEST: f"{self.normalized_url}{client.ENDPOINT_STREAMS_LATEST}", - client.ENDPOINT_STREAMS_NEXT: f"{self.normalized_url}{client.ENDPOINT_STREAMS_NEXT}", - client.ENDPOINT_STREAMS_PREVIOUS: f"{self.normalized_url}{client.ENDPOINT_STREAMS_PREVIOUS}", - }, - ) - - -class TestMoonstreamEndpointsMessyURL(TestMoonstreamEndpoints): - def setUp(self): - self.url = "https://api.moonstream.to/" - self.normalized_url = "https://api.moonstream.to" - - -class TestMoonstreamEndpointsMessyURLWithNoProtocol(TestMoonstreamEndpoints): - def setUp(self): - self.url = "api.moonstream.to/" - self.normalized_url = "http://api.moonstream.to" + queries = self.m.list_queries(self.token) + for query in queries.queries: + if query.name.startswith("test_query_name"): + self.m.delete_query(self.token, query.name) diff --git a/clients/python/moonstream/version.py b/clients/python/moonstream/version.py index 96c1d3e8..ac068728 100644 --- a/clients/python/moonstream/version.py +++ b/clients/python/moonstream/version.py @@ -1 +1 @@ -MOONSTREAM_CLIENT_VERSION = "0.0.3" +MOONSTREAM_CLIENT_VERSION = "0.1.1" diff --git a/clients/python/sample.env b/clients/python/sample.env new file mode 100644 index 00000000..a9262bc2 --- /dev/null +++ b/clients/python/sample.env @@ -0,0 +1,3 @@ +# Tests variables +export MOONSTREAM_API_URL="https://api.moonstream.to" +export MOONSTREAM_ACCESS_TOKEN="" diff --git a/clients/python/setup.py b/clients/python/setup.py index f75e6813..c6abb690 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -11,7 +11,7 @@ setup( version=MOONSTREAM_CLIENT_VERSION, packages=find_packages(), package_data={"moonstream": ["py.typed"]}, - install_requires=["requests", "dataclasses; python_version=='3.6'"], + install_requires=["requests", "pydantic", "dataclasses; python_version=='3.6'"], extras_require={ "dev": [ "black", From 06cdb160429eb518d9374a23d05bd4dc1e57accb Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 09:11:40 +0000 Subject: [PATCH 2/7] Updated release script --- .github/workflows/release.clients.python.yml | 19 ++----------------- clients/python/tag.bash | 12 ------------ 2 files changed, 2 insertions(+), 29 deletions(-) delete mode 100755 clients/python/tag.bash diff --git a/.github/workflows/release.clients.python.yml b/.github/workflows/release.clients.python.yml index 5aa3e1b2..007f30bb 100644 --- a/.github/workflows/release.clients.python.yml +++ b/.github/workflows/release.clients.python.yml @@ -1,9 +1,8 @@ name: Publish Moonstream Python client library on: - push: - tags: - - "clients/python/v*" + release: + types: [published] defaults: run: @@ -29,17 +28,3 @@ jobs: run: | python setup.py sdist bdist_wheel twine upload dist/* - create_release: - runs-on: ubuntu-20.04 - steps: - - uses: actions/create-release@v1 - id: create_release - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: ${{ github.ref }} - release_name: "Moonstream Python client library - ${{ github.ref }}" - body: | - Version ${{ github.ref }} of the Moonstream Python client library. - draft: true - prerelease: false diff --git a/clients/python/tag.bash b/clients/python/tag.bash deleted file mode 100755 index 34b87672..00000000 --- a/clients/python/tag.bash +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -set -e -TAG="clients/python/v$(python setup.py --version)" -read -r -p "Tag: $TAG -- tag and push (y/n)?" ACCEPT -if [ "$ACCEPT" = "y" ] -then - echo "Tagging and pushing: $TAG..." - git tag "$TAG" - git push upstream "$TAG" -else - echo "noop" -fi From b2e598e0f835958f1356faa85cba7913165172cd Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 09:38:35 +0000 Subject: [PATCH 3/7] Timeouts --- clients/python/moonstream/client.py | 26 ++++++++++++++++++++------ clients/python/moonstream/settings.py | 13 +++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) create mode 100644 clients/python/moonstream/settings.py diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index abc7b62d..4bdbb6a7 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -12,6 +12,7 @@ from .data import ( MoonstreamQueryResultUrl, ) from .exceptions import MoonstreamResponseException, MoonstreamUnexpectedResponse +from .settings import MOONSTREAM_API_URL, MOONSTREAM_REQUEST_TIMEOUT ENDPOINT_PING = "/ping" ENDPOINT_VERSION = "/version" @@ -43,9 +44,7 @@ class Moonstream: A Moonstream client configured to communicate with a given Moonstream API server. """ - def __init__( - self, moonstream_api_url: str = "https://api.moonstream.to", timeout: float = 1 - ): + def __init__(self, moonstream_api_url: str = MOONSTREAM_API_URL): """ Initializes a Moonstream API client. @@ -55,11 +54,18 @@ class Moonstream: """ endpoints = moonstream_endpoints(moonstream_api_url) self.api = APISpec(url=moonstream_api_url, endpoints=endpoints) - self.timeout = timeout - def _call(self, method: Method, url: str, **kwargs): + def _call( + self, + method: Method, + url: str, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, + **kwargs, + ): try: - response = requests.request(method.value, url=url, **kwargs) + response = requests.request( + method.value, url=url, timeout=timeout, **kwargs + ) response.raise_for_status() except requests.exceptions.RequestException as err: r = err.response @@ -102,6 +108,7 @@ class Moonstream: name: str, public: bool = False, auth_type: AuthType = AuthType.bearer, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, ) -> MoonstreamQuery: """ Creates new query. @@ -119,6 +126,7 @@ class Moonstream: url=f"{self.api.endpoints[ENDPOINT_QUERIES]}", headers=headers, json=json, + timeout=timeout, ) return MoonstreamQuery( @@ -135,6 +143,7 @@ class Moonstream: self, token: Union[str, uuid.UUID], auth_type: AuthType = AuthType.bearer, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, ) -> MoonstreamQueries: """ Returns list of all queries available to user. @@ -146,6 +155,7 @@ class Moonstream: method=Method.GET, url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/list", headers=headers, + timeout=timeout, ) return MoonstreamQueries( @@ -167,6 +177,7 @@ class Moonstream: name: str, params: Dict[str, Any] = {}, auth_type: AuthType = AuthType.bearer, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, ) -> MoonstreamQueryResultUrl: """ Executes queries and upload data to external storage. @@ -182,6 +193,7 @@ class Moonstream: url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}/update_data", headers=headers, json=json, + timeout=timeout, ) return MoonstreamQueryResultUrl(url=response["url"]) @@ -191,6 +203,7 @@ class Moonstream: token: Union[str, uuid.UUID], name: str, auth_type: AuthType = AuthType.bearer, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, ) -> uuid.UUID: """ Deletes query specified by name. @@ -202,6 +215,7 @@ class Moonstream: method=Method.DELETE, url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}", headers=headers, + timeout=timeout, ) return response["id"] diff --git a/clients/python/moonstream/settings.py b/clients/python/moonstream/settings.py new file mode 100644 index 00000000..c5a179ab --- /dev/null +++ b/clients/python/moonstream/settings.py @@ -0,0 +1,13 @@ +import os + +MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") + +MOONSTREAM_REQUEST_TIMEOUT = 10 +MOONSTREAM_REQUEST_TIMEOUT_RAW = os.environ.get("MOONSTREAM_REQUEST_TIMEOUT") +try: + if MOONSTREAM_REQUEST_TIMEOUT_RAW is not None: + MOONSTREAM_REQUEST_TIMEOUT = int(MOONSTREAM_REQUEST_TIMEOUT_RAW) +except: + raise Exception( + f"Could not parse MOONSTREAM_REQUEST_TIMEOUT environment variable as int: {MOONSTREAM_REQUEST_TIMEOUT_RAW}" + ) From c8f4d4efdc22d7ac7954a9fd7f75292dd529989d Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 11:35:35 +0000 Subject: [PATCH 4/7] Download query results data endpoint --- clients/python/moonstream/client.py | 25 +++++++++++++++++++++++++ clients/python/moonstream/data.py | 5 +++++ 2 files changed, 30 insertions(+) diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 4bdbb6a7..ddaf0a03 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -10,6 +10,7 @@ from .data import ( MoonstreamQueries, MoonstreamQuery, MoonstreamQueryResultUrl, + OutputType, ) from .exceptions import MoonstreamResponseException, MoonstreamUnexpectedResponse from .settings import MOONSTREAM_API_URL, MOONSTREAM_REQUEST_TIMEOUT @@ -198,6 +199,30 @@ class Moonstream: return MoonstreamQueryResultUrl(url=response["url"]) + def download_query_results( + self, + url: str, + output_type: OutputType = OutputType.JSON, + timeout: float = MOONSTREAM_REQUEST_TIMEOUT, + **kwargs, + ) -> Any: + """ + Fetch results of query from url. + """ + try: + response = requests.request( + Method.GET.value, url=url, timeout=timeout, **kwargs + ) + response.raise_for_status() + except Exception as e: + raise Exception(str(e)) + + output = response + if output_type == OutputType.JSON: + output = response.json() + + return output + def delete_query( self, token: Union[str, uuid.UUID], diff --git a/clients/python/moonstream/data.py b/clients/python/moonstream/data.py index 50ca8c5a..e26e2a87 100644 --- a/clients/python/moonstream/data.py +++ b/clients/python/moonstream/data.py @@ -23,6 +23,11 @@ class Method(Enum): PUT = "put" +class OutputType(Enum): + CSV = "csv" + JSON = "json" + + @dataclass(frozen=True) class MoonstreamQuery: id: uuid.UUID From 48a1c8fbed9f5724a55ca37c0823efc71105df01 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 12:18:34 +0000 Subject: [PATCH 5/7] Push to AWS S3 bucker support --- clients/python/moonstream/aws/__init__.py | 0 clients/python/moonstream/aws/bucket.py | 24 +++++++++++++++++++ clients/python/moonstream/client.py | 29 +++++++++++++++++++---- clients/python/mypy.ini | 4 ++++ clients/python/setup.py | 1 + 5 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 clients/python/moonstream/aws/__init__.py create mode 100644 clients/python/moonstream/aws/bucket.py create mode 100644 clients/python/mypy.ini diff --git a/clients/python/moonstream/aws/__init__.py b/clients/python/moonstream/aws/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/clients/python/moonstream/aws/bucket.py b/clients/python/moonstream/aws/bucket.py new file mode 100644 index 00000000..4eab58f3 --- /dev/null +++ b/clients/python/moonstream/aws/bucket.py @@ -0,0 +1,24 @@ +from typing import Any, Dict + +import boto3 + + +def upload_to_aws_s3_bucket( + data: str, + bucket: str, + key: str, + metadata: Dict[str, Any] = {}, +) -> str: + """ + Push data to AWS S3 bucket and return URL to object. + """ + s3 = boto3.client("s3") + s3.put_object( + Body=data, + Bucket=bucket, + Key=key, + ContentType="application/json", + Metadata=metadata, + ) + + return f"{bucket}/{key}" diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index ddaf0a03..036afa5e 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -3,6 +3,10 @@ from typing import Any, Dict, Union import requests +try: + from .aws.bucket import upload_to_aws_s3_bucket +except Exception as e: + pass from .data import ( APISpec, AuthType, @@ -68,12 +72,12 @@ class Moonstream: method.value, url=url, timeout=timeout, **kwargs ) response.raise_for_status() - except requests.exceptions.RequestException as err: - r = err.response - if not err.response: + except requests.exceptions.RequestException as e: + r = e.response + if not e.response: # Connection errors, timeouts, etc... raise MoonstreamResponseException( - "Network error", status_code=599, detail=str(err) + "Network error", status_code=599, detail=str(e) ) if r.headers.get("Content-Type") == "application/json": exception_detail = r.json()["detail"] @@ -223,6 +227,23 @@ class Moonstream: return output + def upload_query_results( + self, data: str, bucket: str, key: str, metadata: Dict[str, Any] = {} + ) -> str: + """ + Uploads data to AWS S3 bucket. + + Requirements: "pip install -e .[aws]" with "boto3" module. + """ + try: + url = upload_to_aws_s3_bucket( + data=data, bucket=bucket, key=key, metadata=metadata + ) + except Exception as e: + raise Exception(str(e)) + + return url + def delete_query( self, token: Union[str, uuid.UUID], diff --git a/clients/python/mypy.ini b/clients/python/mypy.ini new file mode 100644 index 00000000..29ecf6ac --- /dev/null +++ b/clients/python/mypy.ini @@ -0,0 +1,4 @@ +[mypy] + +[mypy-boto3.*] +ignore_missing_imports = True \ No newline at end of file diff --git a/clients/python/setup.py b/clients/python/setup.py index c6abb690..a3733873 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -13,6 +13,7 @@ setup( package_data={"moonstream": ["py.typed"]}, install_requires=["requests", "pydantic", "dataclasses; python_version=='3.6'"], extras_require={ + "aws": ["boto3"], "dev": [ "black", "mypy", From c21dff3df89dd6557084d5ed755f3b4888656dbe Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 15:38:58 +0000 Subject: [PATCH 6/7] Fix slash issues --- clients/python/moonstream/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 036afa5e..238f126b 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -128,7 +128,7 @@ class Moonstream: } response = self._call( method=Method.POST, - url=f"{self.api.endpoints[ENDPOINT_QUERIES]}", + url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/", headers=headers, json=json, timeout=timeout, From 8291fbb58efa99c26904e56e688654c9c62154fd Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 27 Oct 2022 16:12:20 +0000 Subject: [PATCH 7/7] Fix of redirect exception --- clients/python/moonstream/client.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 238f126b..0b55c36f 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -72,22 +72,6 @@ class Moonstream: method.value, url=url, timeout=timeout, **kwargs ) response.raise_for_status() - except requests.exceptions.RequestException as e: - r = e.response - if not e.response: - # Connection errors, timeouts, etc... - raise MoonstreamResponseException( - "Network error", status_code=599, detail=str(e) - ) - if r.headers.get("Content-Type") == "application/json": - exception_detail = r.json()["detail"] - else: - exception_detail = r.text - raise MoonstreamResponseException( - "An exception occurred at Bugout API side", - status_code=r.status_code, - detail=exception_detail, - ) except Exception as e: raise MoonstreamUnexpectedResponse(str(e)) return response.json()