kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #687 from bugout-dev/client-supp-queries
New python client query api basedpull/692/head
commit
e13ef1cbfb
|
@ -1,9 +1,8 @@
|
|||
name: Publish Moonstream Python client library
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "clients/python/v*"
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
defaults:
|
||||
run:
|
||||
|
@ -29,17 +28,3 @@ jobs:
|
|||
run: |
|
||||
python setup.py sdist bdist_wheel
|
||||
twine upload dist/*
|
||||
create_release:
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- uses: actions/create-release@v1
|
||||
id: create_release
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
with:
|
||||
tag_name: ${{ github.ref }}
|
||||
release_name: "Moonstream Python client library - ${{ github.ref }}"
|
||||
body: |
|
||||
Version ${{ github.ref }} of the Moonstream Python client library.
|
||||
draft: true
|
||||
prerelease: false
|
||||
|
|
|
@ -143,5 +143,10 @@ cython_debug/
|
|||
|
||||
# End of https://www.toptal.com/developers/gitignore/api/python
|
||||
|
||||
# Custom
|
||||
.moonstream-py/
|
||||
.venv/
|
||||
.secrets/
|
||||
prod.env
|
||||
dev.env
|
||||
test.env
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
from typing import Any, Dict
|
||||
|
||||
import boto3
|
||||
|
||||
|
||||
def upload_to_aws_s3_bucket(
|
||||
data: str,
|
||||
bucket: str,
|
||||
key: str,
|
||||
metadata: Dict[str, Any] = {},
|
||||
) -> str:
|
||||
"""
|
||||
Push data to AWS S3 bucket and return URL to object.
|
||||
"""
|
||||
s3 = boto3.client("s3")
|
||||
s3.put_object(
|
||||
Body=data,
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
ContentType="application/json",
|
||||
Metadata=metadata,
|
||||
)
|
||||
|
||||
return f"{bucket}/{key}"
|
|
@ -1,41 +1,34 @@
|
|||
import logging
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, Generator, List, Optional, Tuple
|
||||
import uuid
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
import requests
|
||||
|
||||
from .version import MOONSTREAM_CLIENT_VERSION
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
log_level = logging.INFO
|
||||
if os.environ.get("DEBUG", "").lower() in ["true", "1"]:
|
||||
log_level = logging.DEBUG
|
||||
logger.setLevel(log_level)
|
||||
try:
|
||||
from .aws.bucket import upload_to_aws_s3_bucket
|
||||
except Exception as e:
|
||||
pass
|
||||
from .data import (
|
||||
APISpec,
|
||||
AuthType,
|
||||
Method,
|
||||
MoonstreamQueries,
|
||||
MoonstreamQuery,
|
||||
MoonstreamQueryResultUrl,
|
||||
OutputType,
|
||||
)
|
||||
from .exceptions import MoonstreamResponseException, MoonstreamUnexpectedResponse
|
||||
from .settings import MOONSTREAM_API_URL, MOONSTREAM_REQUEST_TIMEOUT
|
||||
|
||||
ENDPOINT_PING = "/ping"
|
||||
ENDPOINT_VERSION = "/version"
|
||||
ENDPOINT_NOW = "/now"
|
||||
ENDPOINT_TOKEN = "/users/token"
|
||||
ENDPOINT_SUBSCRIPTIONS = "/subscriptions/"
|
||||
ENDPOINT_SUBSCRIPTION_TYPES = "/subscriptions/types"
|
||||
ENDPOINT_STREAMS = "/streams/"
|
||||
ENDPOINT_STREAMS_LATEST = "/streams/latest"
|
||||
ENDPOINT_STREAMS_NEXT = "/streams/next"
|
||||
ENDPOINT_STREAMS_PREVIOUS = "/streams/previous"
|
||||
ENDPOINT_QUERIES = "/queries"
|
||||
|
||||
ENDPOINTS = [
|
||||
ENDPOINT_PING,
|
||||
ENDPOINT_VERSION,
|
||||
ENDPOINT_NOW,
|
||||
ENDPOINT_TOKEN,
|
||||
ENDPOINT_SUBSCRIPTIONS,
|
||||
ENDPOINT_SUBSCRIPTION_TYPES,
|
||||
ENDPOINT_STREAMS,
|
||||
ENDPOINT_STREAMS_LATEST,
|
||||
ENDPOINT_STREAMS_NEXT,
|
||||
ENDPOINT_STREAMS_PREVIOUS,
|
||||
ENDPOINT_QUERIES,
|
||||
]
|
||||
|
||||
|
||||
|
@ -43,473 +36,216 @@ def moonstream_endpoints(url: str) -> Dict[str, str]:
|
|||
"""
|
||||
Creates a dictionary of Moonstream API endpoints at the given Moonstream API URL.
|
||||
"""
|
||||
url_with_protocol = url
|
||||
if not (
|
||||
url_with_protocol.startswith("http://")
|
||||
or url_with_protocol.startswith("https://")
|
||||
):
|
||||
url_with_protocol = f"http://{url_with_protocol}"
|
||||
if not (url.startswith("http://") or url.startswith("https://")):
|
||||
url = f"http://{url}"
|
||||
|
||||
normalized_url = url_with_protocol.rstrip("/")
|
||||
normalized_url = url.rstrip("/")
|
||||
|
||||
return {endpoint: f"{normalized_url}{endpoint}" for endpoint in ENDPOINTS}
|
||||
|
||||
|
||||
class UnexpectedResponse(Exception):
|
||||
"""
|
||||
Raised when a server response cannot be parsed into the appropriate/expected Python structure.
|
||||
"""
|
||||
|
||||
|
||||
class Unauthenticated(Exception):
|
||||
"""
|
||||
Raised when a user tries to make a request that needs to be authenticated by they are not authenticated.
|
||||
"""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class APISpec:
|
||||
url: str
|
||||
endpoints: Dict[str, str]
|
||||
|
||||
|
||||
class Moonstream:
|
||||
"""
|
||||
A Moonstream client configured to communicate with a given Moonstream API server.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str = "https://api.moonstream.to",
|
||||
timeout: Optional[float] = None,
|
||||
):
|
||||
def __init__(self, moonstream_api_url: str = MOONSTREAM_API_URL):
|
||||
"""
|
||||
Initializes a Moonstream API client.
|
||||
|
||||
Arguments:
|
||||
url - Moonstream API URL. By default this points to the production Moonstream API at https://api.moonstream.to,
|
||||
but you can replace it with the URL of any other Moonstream API instance.
|
||||
timeout - Timeout (in seconds) for Moonstream API requests. Default is None, which means that
|
||||
Moonstream API requests will never time out.
|
||||
|
||||
Returns: A Moonstream client.
|
||||
Arguments:
|
||||
url - Moonstream API URL. By default this points to the production Moonstream API at https://api.moonstream.to,
|
||||
but you can replace it with the URL of any other Moonstream API instance.
|
||||
"""
|
||||
endpoints = moonstream_endpoints(url)
|
||||
self.api = APISpec(url=url, endpoints=endpoints)
|
||||
self.timeout = timeout
|
||||
self._session = requests.Session()
|
||||
self._session.headers.update(
|
||||
{
|
||||
"User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})"
|
||||
}
|
||||
)
|
||||
endpoints = moonstream_endpoints(moonstream_api_url)
|
||||
self.api = APISpec(url=moonstream_api_url, endpoints=endpoints)
|
||||
|
||||
def _call(
|
||||
self,
|
||||
method: Method,
|
||||
url: str,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
**kwargs,
|
||||
):
|
||||
try:
|
||||
response = requests.request(
|
||||
method.value, url=url, timeout=timeout, **kwargs
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
raise MoonstreamUnexpectedResponse(str(e))
|
||||
return response.json()
|
||||
|
||||
def ping(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Checks that you have a connection to the Moonstream API.
|
||||
"""
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_PING])
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
result = self._call(method=Method.GET, url=self.api.endpoints[ENDPOINT_PING])
|
||||
return result
|
||||
|
||||
def version(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Gets the Moonstream API version information from the server.
|
||||
"""
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_VERSION])
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
result = self._call(method=Method.GET, url=self.api.endpoints[ENDPOINT_VERSION])
|
||||
return result
|
||||
|
||||
def server_time(self) -> float:
|
||||
"""
|
||||
Gets the current time (as microseconds since the Unix epoch) on the server.
|
||||
"""
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_NOW])
|
||||
r.raise_for_status()
|
||||
result = r.json()
|
||||
raw_epoch_time = result.get("epoch_time")
|
||||
if raw_epoch_time is None:
|
||||
raise UnexpectedResponse(
|
||||
f'Server response does not contain "epoch_time": {result}'
|
||||
)
|
||||
|
||||
try:
|
||||
epoch_time = float(raw_epoch_time)
|
||||
except:
|
||||
raise UnexpectedResponse(
|
||||
f"Could not process epoch time as a float: {raw_epoch_time}"
|
||||
)
|
||||
|
||||
return epoch_time
|
||||
|
||||
def authorize(self, access_token: str) -> None:
|
||||
if not access_token:
|
||||
logger.warning("Setting authorization header to empty token.")
|
||||
self._session.headers.update({"Authorization": f"Bearer {access_token}"})
|
||||
|
||||
def requires_authorization(self):
|
||||
if self._session.headers.get("Authorization") is None:
|
||||
raise Unauthenticated(
|
||||
'This method requires that you authenticate to the API, either by calling the "authorize" method with an API token or by calling the "login" method.'
|
||||
)
|
||||
|
||||
def login(self, username: str, password: Optional[str] = None) -> str:
|
||||
"""
|
||||
Authorizes this client to act as the given user when communicating with the Moonstream API.
|
||||
|
||||
To register an account on the production Moonstream API, go to https://moonstream.to.
|
||||
|
||||
Arguments:
|
||||
username - Username of the user to authenticate as.
|
||||
password - Optional password for the user. If this is not provided, you will be prompted for
|
||||
the password.
|
||||
"""
|
||||
if password is None:
|
||||
password = input(f"Moonstream password for {username}: ")
|
||||
|
||||
r = self._session.post(
|
||||
self.api.endpoints[ENDPOINT_TOKEN],
|
||||
data={"username": username, "password": password},
|
||||
)
|
||||
r.raise_for_status()
|
||||
|
||||
token = r.json()
|
||||
self.authorize(token["id"])
|
||||
return token
|
||||
|
||||
def logout(self) -> None:
|
||||
"""
|
||||
Logs the current user out of the Moonstream client.
|
||||
"""
|
||||
self._session.delete(self.api.endpoints[ENDPOINT_TOKEN])
|
||||
self._session.headers.pop("Authorization")
|
||||
|
||||
def subscription_types(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Gets the currently available subscription types on the Moonstream API.
|
||||
"""
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_SUBSCRIPTION_TYPES])
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def list_subscriptions(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Gets the currently authorized user's subscriptions from the API server.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_SUBSCRIPTIONS])
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def create_subscription(
|
||||
self, subscription_type: str, label: str, color: str, specifier: str = ""
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Creates a subscription.
|
||||
|
||||
Arguments:
|
||||
subscription_type - The type of subscription you would like to create. To see the available subscription
|
||||
types, call the "subscription_types" method on this Moonstream client. This argument must be
|
||||
the "id" if the subscription type you want.
|
||||
label - A label for the subscription. This will identify the subscription to you in your stream.
|
||||
color - A hexadecimal color to associate with the subscription.
|
||||
specifier - A specifier for the subscription, which must correspond to one of the choices in the
|
||||
subscription type. This is optional because some subscription types do not require a specifier.
|
||||
|
||||
Returns: The subscription resource that was created on the backend.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
r = self._session.post(
|
||||
self.api.endpoints[ENDPOINT_SUBSCRIPTIONS],
|
||||
data={
|
||||
"subscription_type_id": subscription_type,
|
||||
"label": label,
|
||||
"color": color,
|
||||
"address": specifier,
|
||||
},
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def delete_subscription(self, id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Delete a subscription by ID.
|
||||
|
||||
Arguments:
|
||||
id - ID of the subscription to delete.
|
||||
|
||||
Returns: The subscription resource that was deleted.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
r = self._session.delete(f"{self.api.endpoints[ENDPOINT_SUBSCRIPTIONS]}{id}")
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def update_subscription(
|
||||
self, id: str, label: Optional[str] = None, color: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Update a subscription label or color.
|
||||
|
||||
Arguments:
|
||||
label - New label for subscription (optional).
|
||||
color - New color for subscription (optional).
|
||||
|
||||
Returns - If neither label or color are specified, raises a ValueError. Otherwise PUTs the updated
|
||||
information to the server and returns the updated subscription resource.
|
||||
"""
|
||||
if label is None and color is None:
|
||||
raise ValueError(
|
||||
"At least one of the arguments to this method should not be None."
|
||||
)
|
||||
self.requires_authorization()
|
||||
data = {}
|
||||
if label is not None:
|
||||
data["label"] = label
|
||||
if color is not None:
|
||||
data["color"] = color
|
||||
|
||||
r = self._session.put(
|
||||
f"{self.api.endpoints[ENDPOINT_SUBSCRIPTIONS]}{id}", data=data
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def latest_events(self, q: str = "") -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Returns the latest events in your stream. You can optionally provide a query parameter to
|
||||
constrain the query to specific subscription types or to specific subscriptions.
|
||||
|
||||
Arguments:
|
||||
- q - Optional query (default is the empty string). The syntax to constrain to a particular
|
||||
type of subscription is "type:<subscription_type>". For example, to get the latest event from
|
||||
your Ethereum transaction pool subscriptions, you would use "type:ethereum_txpool".
|
||||
|
||||
Returns: A list of the latest events in your stream.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
query_params: Dict[str, str] = {}
|
||||
if q:
|
||||
query_params["q"] = q
|
||||
r = self._session.get(
|
||||
self.api.endpoints[ENDPOINT_STREAMS_LATEST], params=query_params
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def next_event(
|
||||
self, end_time: int, include_end: bool = True, q: str = ""
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Return the earliest event in your stream that occurred after the given end_time.
|
||||
|
||||
Arguments:
|
||||
- end_time - Time after which you want to retrieve the earliest event from your stream.
|
||||
- include_end - If True, the result is the first event that occurred in your stream strictly
|
||||
*after* the end time. If False, then you will get the first event that occurred in your
|
||||
stream *on* or *after* the end time.
|
||||
- q - Optional query to filter over your available subscriptions and subscription types.
|
||||
|
||||
Returns: None if no event has occurred after the given end time, else returns a dictionary
|
||||
representing that event.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
query_params: Dict[str, Any] = {
|
||||
"end_time": end_time,
|
||||
"include_end": include_end,
|
||||
}
|
||||
if q:
|
||||
query_params["q"] = q
|
||||
r = self._session.get(
|
||||
self.api.endpoints[ENDPOINT_STREAMS_NEXT], params=query_params
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def previous_event(
|
||||
self, start_time: int, include_start: bool = True, q: str = ""
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Return the latest event in your stream that occurred before the given start_time.
|
||||
|
||||
Arguments:
|
||||
- start_time - Time before which you want to retrieve the latest event from your stream.
|
||||
- include_start - If True, the result is the last event that occurred in your stream strictly
|
||||
*before* the start time. If False, then you will get the last event that occurred in your
|
||||
stream *on* or *before* the start time.
|
||||
- q - Optional query to filter over your available subscriptions and subscription types.
|
||||
|
||||
Returns: None if no event has occurred before the given start time, else returns a dictionary
|
||||
representing that event.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
query_params: Dict[str, Any] = {
|
||||
"start_time": start_time,
|
||||
"include_start": include_start,
|
||||
}
|
||||
if q:
|
||||
query_params["q"] = q
|
||||
r = self._session.get(
|
||||
self.api.endpoints[ENDPOINT_STREAMS_PREVIOUS], params=query_params
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def events(
|
||||
def create_query(
|
||||
self,
|
||||
start_time: int,
|
||||
end_time: int,
|
||||
include_start: bool = False,
|
||||
include_end: bool = False,
|
||||
q: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
token: Union[str, uuid.UUID],
|
||||
query: str,
|
||||
name: str,
|
||||
public: bool = False,
|
||||
auth_type: AuthType = AuthType.bearer,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
) -> MoonstreamQuery:
|
||||
"""
|
||||
Return all events in your stream that occurred between the given start and end times.
|
||||
|
||||
Arguments:
|
||||
- start_time - Time after which you want to query your stream.
|
||||
- include_start - Whether or not events that occurred exactly at the start_time should be included in the results.
|
||||
- end_time - Time before which you want to query your stream.
|
||||
- include_end - Whether or not events that occurred exactly at the end_time should be included in the results.
|
||||
- q - Optional query to filter over your available subscriptions and subscription types.
|
||||
|
||||
Returns: A dictionary representing the results of your query.
|
||||
Creates new query.
|
||||
"""
|
||||
self.requires_authorization()
|
||||
query_params: Dict[str, Any] = {
|
||||
"start_time": start_time,
|
||||
"include_start": include_start,
|
||||
"end_time": end_time,
|
||||
"include_end": include_end,
|
||||
json = {
|
||||
"query": query,
|
||||
"name": name,
|
||||
"public": public,
|
||||
}
|
||||
if q:
|
||||
query_params["q"] = q
|
||||
headers = {
|
||||
"Authorization": f"{auth_type.value} {token}",
|
||||
}
|
||||
response = self._call(
|
||||
method=Method.POST,
|
||||
url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/",
|
||||
headers=headers,
|
||||
json=json,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
r = self._session.get(self.api.endpoints[ENDPOINT_STREAMS], params=query_params)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
return MoonstreamQuery(
|
||||
id=response["id"],
|
||||
journal_url=response["journal_url"],
|
||||
name=response["title"],
|
||||
query=response["content"],
|
||||
tags=response["tags"],
|
||||
created_at=response["created_at"],
|
||||
updated_at=response["updated_at"],
|
||||
)
|
||||
|
||||
def create_stream(
|
||||
def list_queries(
|
||||
self,
|
||||
start_time: int,
|
||||
end_time: Optional[int] = None,
|
||||
q: str = "",
|
||||
) -> Generator[Dict[str, Any], None, None]:
|
||||
token: Union[str, uuid.UUID],
|
||||
auth_type: AuthType = AuthType.bearer,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
) -> MoonstreamQueries:
|
||||
"""
|
||||
Return a stream of event. Event packs will be generated with 1 hour time range.
|
||||
|
||||
Arguments:
|
||||
- start_time - One of time border.
|
||||
- end_time - Time until the end of stream, if set to None stream will be going forward endlessly.
|
||||
- q - Optional query to filter over your available subscriptions and subscription types.
|
||||
|
||||
Returns: A dictionary stream representing the results of your query.
|
||||
Returns list of all queries available to user.
|
||||
"""
|
||||
# TODO(kompotkot): Add tests
|
||||
shift_two_hours = 2 * 60 * 60 # 2 hours
|
||||
shift_half_hour = 1 * 30 * 30 # 30 min
|
||||
headers = {
|
||||
"Authorization": f"{auth_type.value} {token}",
|
||||
}
|
||||
response = self._call(
|
||||
method=Method.GET,
|
||||
url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/list",
|
||||
headers=headers,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
def fetch_events(
|
||||
modified_start_time: int, modified_end_time: int
|
||||
) -> Generator[Tuple[Dict[str, Any], bool], None, None]:
|
||||
# If it is going from top to bottom in history,
|
||||
# then time_range will be reversed
|
||||
reversed_time = False
|
||||
if modified_start_time > modified_end_time:
|
||||
reversed_time = True
|
||||
max_boundary = max(modified_start_time, modified_end_time)
|
||||
min_boundary = min(modified_start_time, modified_end_time)
|
||||
|
||||
time_range_list = []
|
||||
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
|
||||
if max_boundary - min_boundary > shift_half_hour:
|
||||
for i in range(min_boundary, max_boundary, shift_half_hour):
|
||||
end_i = (
|
||||
i + shift_half_hour - 1
|
||||
if i + shift_half_hour <= max_boundary
|
||||
else max_boundary
|
||||
)
|
||||
time_range_list.append({"start_time": i, "end_time": end_i})
|
||||
else:
|
||||
time_range_list.append(
|
||||
{"start_time": min_boundary, "end_time": max_boundary}
|
||||
return MoonstreamQueries(
|
||||
queries=[
|
||||
MoonstreamQuery(
|
||||
id=query["entry_id"],
|
||||
name=query["name"],
|
||||
query_type=query["type"],
|
||||
user=query["user"],
|
||||
user_id=query["user_id"],
|
||||
)
|
||||
if reversed_time:
|
||||
time_range_list.reverse()
|
||||
for query in response
|
||||
]
|
||||
)
|
||||
|
||||
for time_range in time_range_list:
|
||||
r_json = self.events(
|
||||
start_time=time_range["start_time"],
|
||||
end_time=time_range["end_time"],
|
||||
include_start=True,
|
||||
include_end=True,
|
||||
q=q,
|
||||
)
|
||||
def exec_query(
|
||||
self,
|
||||
token: Union[str, uuid.UUID],
|
||||
name: str,
|
||||
params: Dict[str, Any] = {},
|
||||
auth_type: AuthType = AuthType.bearer,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
) -> MoonstreamQueryResultUrl:
|
||||
"""
|
||||
Executes queries and upload data to external storage.
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"{auth_type.value} {token}",
|
||||
}
|
||||
json = {
|
||||
"params": params,
|
||||
}
|
||||
response = self._call(
|
||||
method=Method.POST,
|
||||
url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}/update_data",
|
||||
headers=headers,
|
||||
json=json,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
yield r_json, reversed_time
|
||||
return MoonstreamQueryResultUrl(url=response["url"])
|
||||
|
||||
time_range_list = time_range_list[:]
|
||||
|
||||
if end_time is None:
|
||||
float_start_time = start_time
|
||||
|
||||
while True:
|
||||
end_time = int(self.server_time())
|
||||
# If time range is greater then 2 hours,
|
||||
# shift float_start time close to end_time to prevent stream block
|
||||
if end_time - float_start_time > shift_two_hours:
|
||||
float_start_time = shift_two_hours
|
||||
for r_json, reversed_time in fetch_events(float_start_time, end_time):
|
||||
|
||||
yield r_json
|
||||
|
||||
events = r_json.get("events", [])
|
||||
if len(events) > 0:
|
||||
# Updating float_start_time after first iteration to last event time
|
||||
if reversed_time:
|
||||
float_start_time = events[-1].get("event_timestamp") - 1
|
||||
else:
|
||||
float_start_time = events[0].get("event_timestamp") + 1
|
||||
|
||||
else:
|
||||
# If there are no events in response, wait
|
||||
# until new will be added
|
||||
time.sleep(5)
|
||||
else:
|
||||
for r_json, reversed_time in fetch_events(start_time, end_time):
|
||||
yield r_json
|
||||
|
||||
|
||||
def client_from_env() -> Moonstream:
|
||||
"""
|
||||
Produces a Moonstream client instantiated using the following environment variables:
|
||||
- MOONSTREAM_API_URL: Specifies the url parameter on the Moonstream client
|
||||
- MOONSTREAM_TIMEOUT_SECONDS: Specifies the request timeout
|
||||
- MOONSTREAM_ACCESS_TOKEN: If this environment variable is defined, the client sets this token as
|
||||
the authorization header for all Moonstream API requests.
|
||||
"""
|
||||
kwargs: Dict[str, Any] = {}
|
||||
|
||||
url = os.environ.get("MOONSTREAM_API_URL")
|
||||
if url is not None:
|
||||
kwargs["url"] = url
|
||||
|
||||
raw_timeout = os.environ.get("MOONSTREAM_TIMEOUT_SECONDS")
|
||||
timeout: Optional[float] = None
|
||||
if raw_timeout is not None:
|
||||
def download_query_results(
|
||||
self,
|
||||
url: str,
|
||||
output_type: OutputType = OutputType.JSON,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
"""
|
||||
Fetch results of query from url.
|
||||
"""
|
||||
try:
|
||||
timeout = float(raw_timeout)
|
||||
except:
|
||||
raise ValueError(
|
||||
f"Could not convert MOONSTREAM_TIMEOUT_SECONDS ({raw_timeout}) to float."
|
||||
response = requests.request(
|
||||
Method.GET.value, url=url, timeout=timeout, **kwargs
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
raise Exception(str(e))
|
||||
|
||||
kwargs["timeout"] = timeout
|
||||
output = response
|
||||
if output_type == OutputType.JSON:
|
||||
output = response.json()
|
||||
|
||||
moonstream_client = Moonstream(**kwargs)
|
||||
return output
|
||||
|
||||
access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN")
|
||||
if access_token is not None:
|
||||
moonstream_client.authorize(access_token)
|
||||
def upload_query_results(
|
||||
self, data: str, bucket: str, key: str, metadata: Dict[str, Any] = {}
|
||||
) -> str:
|
||||
"""
|
||||
Uploads data to AWS S3 bucket.
|
||||
|
||||
return moonstream_client
|
||||
Requirements: "pip install -e .[aws]" with "boto3" module.
|
||||
"""
|
||||
try:
|
||||
url = upload_to_aws_s3_bucket(
|
||||
data=data, bucket=bucket, key=key, metadata=metadata
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(str(e))
|
||||
|
||||
return url
|
||||
|
||||
def delete_query(
|
||||
self,
|
||||
token: Union[str, uuid.UUID],
|
||||
name: str,
|
||||
auth_type: AuthType = AuthType.bearer,
|
||||
timeout: float = MOONSTREAM_REQUEST_TIMEOUT,
|
||||
) -> uuid.UUID:
|
||||
"""
|
||||
Deletes query specified by name.
|
||||
"""
|
||||
headers = {
|
||||
"Authorization": f"{auth_type.value} {token}",
|
||||
}
|
||||
response = self._call(
|
||||
method=Method.DELETE,
|
||||
url=f"{self.api.endpoints[ENDPOINT_QUERIES]}/{name}",
|
||||
headers=headers,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return response["id"]
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class APISpec:
|
||||
url: str
|
||||
endpoints: Dict[str, str]
|
||||
|
||||
|
||||
class AuthType(Enum):
|
||||
bearer = "Bearer"
|
||||
web3 = "Web3"
|
||||
|
||||
|
||||
class Method(Enum):
|
||||
DELETE = "delete"
|
||||
GET = "get"
|
||||
POST = "post"
|
||||
PUT = "put"
|
||||
|
||||
|
||||
class OutputType(Enum):
|
||||
CSV = "csv"
|
||||
JSON = "json"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MoonstreamQuery:
|
||||
id: uuid.UUID
|
||||
name: str
|
||||
journal_url: Optional[str] = None
|
||||
query: Optional[str] = None
|
||||
tags: Optional[List[str]] = None
|
||||
user: Optional[str] = None
|
||||
user_id: Optional[uuid.UUID] = None
|
||||
query_type: Optional[str] = None
|
||||
created_at: Optional[datetime] = None
|
||||
updated_at: Optional[datetime] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MoonstreamQueries:
|
||||
queries: List[MoonstreamQuery]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MoonstreamQueryResultUrl:
|
||||
url: str
|
|
@ -0,0 +1,24 @@
|
|||
from typing import Any, Optional
|
||||
|
||||
|
||||
class MoonstreamResponseException(Exception):
|
||||
"""
|
||||
Raised when Moonstream server response with error.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message,
|
||||
status_code: int,
|
||||
detail: Optional[Any] = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
if detail is not None:
|
||||
self.detail = detail
|
||||
|
||||
|
||||
class MoonstreamUnexpectedResponse(Exception):
|
||||
"""
|
||||
Raised when Moonstream server response is unexpected (e.g. unparseable).
|
||||
"""
|
|
@ -0,0 +1,13 @@
|
|||
import os
|
||||
|
||||
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
|
||||
|
||||
MOONSTREAM_REQUEST_TIMEOUT = 10
|
||||
MOONSTREAM_REQUEST_TIMEOUT_RAW = os.environ.get("MOONSTREAM_REQUEST_TIMEOUT")
|
||||
try:
|
||||
if MOONSTREAM_REQUEST_TIMEOUT_RAW is not None:
|
||||
MOONSTREAM_REQUEST_TIMEOUT = int(MOONSTREAM_REQUEST_TIMEOUT_RAW)
|
||||
except:
|
||||
raise Exception(
|
||||
f"Could not parse MOONSTREAM_REQUEST_TIMEOUT environment variable as int: {MOONSTREAM_REQUEST_TIMEOUT_RAW}"
|
||||
)
|
|
@ -1,138 +1,54 @@
|
|||
from dataclasses import FrozenInstanceError
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from . import client
|
||||
|
||||
|
||||
class TestMoonstreamClient(unittest.TestCase):
|
||||
def test_client_init(self):
|
||||
m = client.Moonstream()
|
||||
self.assertEqual(m.api.url, "https://api.moonstream.to")
|
||||
self.assertIsNone(m.timeout)
|
||||
self.assertGreater(len(m.api.endpoints), 0)
|
||||
|
||||
def test_client_init_with_timeout(self):
|
||||
timeout = 7
|
||||
m = client.Moonstream(timeout=timeout)
|
||||
self.assertEqual(m.api.url, "https://api.moonstream.to")
|
||||
self.assertEqual(m.timeout, timeout)
|
||||
self.assertGreater(len(m.api.endpoints), 0)
|
||||
|
||||
def test_client_with_custom_url_and_timeout(self):
|
||||
timeout = 9
|
||||
url = "https://my.custom.api.url"
|
||||
m = client.Moonstream(url=url, timeout=timeout)
|
||||
self.assertEqual(m.api.url, url)
|
||||
self.assertEqual(m.timeout, timeout)
|
||||
self.assertGreater(len(m.api.endpoints), 0)
|
||||
|
||||
def test_client_with_custom_messy_url_and_timeout(self):
|
||||
timeout = 3.5
|
||||
url = "https://my.custom.api.url/"
|
||||
m = client.Moonstream(url=url, timeout=timeout)
|
||||
self.assertEqual(m.api.url, url)
|
||||
self.assertEqual(m.timeout, timeout)
|
||||
self.assertGreater(len(m.api.endpoints), 0)
|
||||
|
||||
def test_client_with_custom_messy_url_no_protocol_and_timeout(self):
|
||||
timeout = 5.5
|
||||
url = "my.custom.api.url/"
|
||||
m = client.Moonstream(url=url, timeout=timeout)
|
||||
self.assertEqual(m.api.url, url)
|
||||
self.assertEqual(m.timeout, timeout)
|
||||
self.assertGreater(len(m.api.endpoints), 0)
|
||||
|
||||
def test_immutable_api_url(self):
|
||||
m = client.Moonstream()
|
||||
with self.assertRaises(FrozenInstanceError):
|
||||
m.api.url = "lol"
|
||||
|
||||
def test_immutable_api_endpoints(self):
|
||||
m = client.Moonstream()
|
||||
with self.assertRaises(FrozenInstanceError):
|
||||
m.api.endpoints = {}
|
||||
|
||||
def test_mutable_timeout(self):
|
||||
original_timeout = 5.0
|
||||
updated_timeout = 10.5
|
||||
m = client.Moonstream(timeout=original_timeout)
|
||||
self.assertEqual(m.timeout, original_timeout)
|
||||
m.timeout = updated_timeout
|
||||
self.assertEqual(m.timeout, updated_timeout)
|
||||
|
||||
|
||||
class TestMoonstreamClientFromEnv(unittest.TestCase):
|
||||
class TestMoonstreamCalls(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.old_moonstream_api_url = os.environ.get("MOONSTREAM_API_URL")
|
||||
self.old_moonstream_timeout_seconds = os.environ.get(
|
||||
"MOONSTREAM_TIMEOUT_SECONDS"
|
||||
url = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
|
||||
self.token = os.environ.get("MOONSTREAM_ACCESS_TOKEN")
|
||||
if self.token is None:
|
||||
raise Exception("MOONSTREAM_ACCESS_TOKEN should be specified")
|
||||
self.m = client.Moonstream(moonstream_api_url=url)
|
||||
|
||||
queries = self.m.list_queries(self.token)
|
||||
for query in queries.queries:
|
||||
if query.name.startswith("test_query_name"):
|
||||
self.m.delete_query(self.token, query.name)
|
||||
|
||||
def test_ping(self):
|
||||
response = self.m.ping()
|
||||
self.assertEqual(response["status"], "ok")
|
||||
|
||||
def test_create_query(self):
|
||||
query = "SELECT count(*) FROM polygon_blocks"
|
||||
name = "test-query-name-1"
|
||||
response = self.m.create_query(self.token, query, name)
|
||||
self.assertEqual(f"Query:{name.replace('-', '_')}", response.name)
|
||||
|
||||
def test_list_queries(self):
|
||||
query = (
|
||||
"SELECT hash,block_number FROM polygon_blocks WHERE block_number = 21175765"
|
||||
)
|
||||
self.old_moonstream_access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN")
|
||||
name = "test-query-name-2"
|
||||
response_1 = self.m.create_query(self.token, query, name)
|
||||
self.assertEqual(f"Query:{name.replace('-', '_')}", response_1.name)
|
||||
|
||||
self.moonstream_api_url = "https://custom.example.com"
|
||||
self.moonstream_timeout_seconds = 15.333333
|
||||
self.moonstream_access_token = "1d431ca4-af9b-4c3a-b7b9-3cc79f3b0900"
|
||||
response_2 = self.m.list_queries(self.token)
|
||||
self.assertGreaterEqual(len(response_2.queries), 1)
|
||||
|
||||
os.environ["MOONSTREAM_API_URL"] = self.moonstream_api_url
|
||||
os.environ["MOONSTREAM_TIMEOUT_SECONDS"] = str(self.moonstream_timeout_seconds)
|
||||
os.environ["MOONSTREAM_ACCESS_TOKEN"] = self.moonstream_access_token
|
||||
def test_delete_query(self):
|
||||
query = "SELECT 1"
|
||||
name = "test-query-name-0"
|
||||
response_1 = self.m.create_query(self.token, query, name)
|
||||
self.assertEqual(f"Query:{name.replace('-', '_')}", response_1.name)
|
||||
|
||||
response_2 = self.m.delete_query(self.token, name.replace("-", "_"))
|
||||
self.assertEqual(response_1.id, response_2)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
del os.environ["MOONSTREAM_API_URL"]
|
||||
del os.environ["MOONSTREAM_TIMEOUT_SECONDS"]
|
||||
del os.environ["MOONSTREAM_ACCESS_TOKEN"]
|
||||
|
||||
if self.old_moonstream_api_url is not None:
|
||||
os.environ["MOONSTREAM_API_URL"] = self.old_moonstream_api_url
|
||||
if self.old_moonstream_timeout_seconds is not None:
|
||||
os.environ[
|
||||
"MOONSTREAM_TIMEOUT_SECONDS"
|
||||
] = self.old_moonstream_timeout_seconds
|
||||
if self.old_moonstream_access_token is not None:
|
||||
os.environ["MOONSTREAM_ACCESS_TOKEN"] = self.old_moonstream_access_token
|
||||
|
||||
def test_client_from_env(self):
|
||||
m = client.client_from_env()
|
||||
self.assertEqual(m.api.url, self.moonstream_api_url)
|
||||
self.assertEqual(m.timeout, self.moonstream_timeout_seconds)
|
||||
self.assertIsNone(m.requires_authorization())
|
||||
|
||||
authorization_header = m._session.headers["Authorization"]
|
||||
self.assertEqual(authorization_header, f"Bearer {self.moonstream_access_token}")
|
||||
|
||||
|
||||
class TestMoonstreamEndpoints(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.url = "https://api.moonstream.to"
|
||||
self.normalized_url = "https://api.moonstream.to"
|
||||
|
||||
def test_moonstream_endpoints(self):
|
||||
endpoints = client.moonstream_endpoints(self.url)
|
||||
self.assertDictEqual(
|
||||
endpoints,
|
||||
{
|
||||
client.ENDPOINT_PING: f"{self.normalized_url}{client.ENDPOINT_PING}",
|
||||
client.ENDPOINT_VERSION: f"{self.normalized_url}{client.ENDPOINT_VERSION}",
|
||||
client.ENDPOINT_NOW: f"{self.normalized_url}{client.ENDPOINT_NOW}",
|
||||
client.ENDPOINT_TOKEN: f"{self.normalized_url}{client.ENDPOINT_TOKEN}",
|
||||
client.ENDPOINT_SUBSCRIPTION_TYPES: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTION_TYPES}",
|
||||
client.ENDPOINT_SUBSCRIPTIONS: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTIONS}",
|
||||
client.ENDPOINT_STREAMS: f"{self.normalized_url}{client.ENDPOINT_STREAMS}",
|
||||
client.ENDPOINT_STREAMS_LATEST: f"{self.normalized_url}{client.ENDPOINT_STREAMS_LATEST}",
|
||||
client.ENDPOINT_STREAMS_NEXT: f"{self.normalized_url}{client.ENDPOINT_STREAMS_NEXT}",
|
||||
client.ENDPOINT_STREAMS_PREVIOUS: f"{self.normalized_url}{client.ENDPOINT_STREAMS_PREVIOUS}",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class TestMoonstreamEndpointsMessyURL(TestMoonstreamEndpoints):
|
||||
def setUp(self):
|
||||
self.url = "https://api.moonstream.to/"
|
||||
self.normalized_url = "https://api.moonstream.to"
|
||||
|
||||
|
||||
class TestMoonstreamEndpointsMessyURLWithNoProtocol(TestMoonstreamEndpoints):
|
||||
def setUp(self):
|
||||
self.url = "api.moonstream.to/"
|
||||
self.normalized_url = "http://api.moonstream.to"
|
||||
queries = self.m.list_queries(self.token)
|
||||
for query in queries.queries:
|
||||
if query.name.startswith("test_query_name"):
|
||||
self.m.delete_query(self.token, query.name)
|
||||
|
|
|
@ -1 +1 @@
|
|||
MOONSTREAM_CLIENT_VERSION = "0.0.3"
|
||||
MOONSTREAM_CLIENT_VERSION = "0.1.1"
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
[mypy]
|
||||
|
||||
[mypy-boto3.*]
|
||||
ignore_missing_imports = True
|
|
@ -0,0 +1,3 @@
|
|||
# Tests variables
|
||||
export MOONSTREAM_API_URL="https://api.moonstream.to"
|
||||
export MOONSTREAM_ACCESS_TOKEN="<access_token_for_tests>"
|
|
@ -11,8 +11,9 @@ setup(
|
|||
version=MOONSTREAM_CLIENT_VERSION,
|
||||
packages=find_packages(),
|
||||
package_data={"moonstream": ["py.typed"]},
|
||||
install_requires=["requests", "dataclasses; python_version=='3.6'"],
|
||||
install_requires=["requests", "pydantic", "dataclasses; python_version=='3.6'"],
|
||||
extras_require={
|
||||
"aws": ["boto3"],
|
||||
"dev": [
|
||||
"black",
|
||||
"mypy",
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
set -e
|
||||
TAG="clients/python/v$(python setup.py --version)"
|
||||
read -r -p "Tag: $TAG -- tag and push (y/n)?" ACCEPT
|
||||
if [ "$ACCEPT" = "y" ]
|
||||
then
|
||||
echo "Tagging and pushing: $TAG..."
|
||||
git tag "$TAG"
|
||||
git push upstream "$TAG"
|
||||
else
|
||||
echo "noop"
|
||||
fi
|
Ładowanie…
Reference in New Issue