Basic Moonstream API client done.

pull/266/head
Neeraj Kashyap 2021-09-20 16:49:29 -07:00
rodzic 9800dc474f
commit 21fa22089a
2 zmienionych plików z 129 dodań i 4 usunięć

Wyświetl plik

@ -1,7 +1,7 @@
from dataclasses import dataclass, field
import logging
import os
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
import requests
@ -21,6 +21,11 @@ ENDPOINT_NOW = "/now"
ENDPOINT_TOKEN = "/users/token"
ENDPOINT_SUBSCRIPTIONS = "/subscriptions/"
ENDPOINT_SUBSCRIPTION_TYPES = "/subscriptions/types"
ENDPOINT_STREAMS = "/streams/"
ENDPOINT_STREAMS_LATEST = "/streams/latest"
ENDPOINT_STREAMS_NEXT = "/streams/next"
ENDPOINT_STREAMS_PREVIOUS = "/streams/previous"
ENDPOINTS = [
ENDPOINT_PING,
ENDPOINT_VERSION,
@ -28,6 +33,10 @@ ENDPOINTS = [
ENDPOINT_TOKEN,
ENDPOINT_SUBSCRIPTIONS,
ENDPOINT_SUBSCRIPTION_TYPES,
ENDPOINT_STREAMS,
ENDPOINT_STREAMS_LATEST,
ENDPOINT_STREAMS_NEXT,
ENDPOINT_STREAMS_PREVIOUS,
]
@ -263,6 +272,120 @@ class Moonstream:
r.raise_for_status()
return r.json()
def latest_events(self, q: str = "") -> List[Dict[str, Any]]:
"""
Returns the latest events in your stream. You can optionally provide a query parameter to
constrain the query to specific subscription types or to specific subscriptions.
Arguments:
- q - Optional query (default is the empty string). The syntax to constrain to a particular
type of subscription is "type:<subscription_type>". For example, to get the latest event from
your Ethereum transaction pool subscriptions, you would use "type:ethereum_txpool".
Returns: A list of the latest events in your stream.
"""
self.requires_authorization()
query_params: Dict[str, str] = {}
if q:
query_params["q"] = q
r = self._session.get(
self.api.endpoints[ENDPOINT_STREAMS_LATEST], params=query_params
)
r.raise_for_status()
return r.json()
def next_event(
self, end_time: int, include_end: bool = True, q: str = ""
) -> Optional[Dict[str, Any]]:
"""
Return the earliest event in your stream that occurred after the given end_time.
Arguments:
- end_time - Time after which you want to retrieve the earliest event from your stream.
- include_end - If True, the result is the first event that occurred in your stream strictly
*after* the end time. If False, then you will get the first event that occurred in your
stream *on* or *after* the end time.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: None if no event has occurred after the given end time, else returns a dictionary
representing that event.
"""
self.requires_authorization()
query_params: Dict[str, Any] = {
"end_time": end_time,
"include_end": include_end,
}
if q:
query_params["q"] = q
r = self._session.get(
self.api.endpoints[ENDPOINT_STREAMS_NEXT], params=query_params
)
r.raise_for_status()
return r.json()
def previous_event(
self, start_time: int, include_start: bool = True, q: str = ""
) -> Optional[Dict[str, Any]]:
"""
Return the latest event in your stream that occurred before the given start_time.
Arguments:
- start_time - Time before which you want to retrieve the latest event from your stream.
- include_start - If True, the result is the last event that occurred in your stream strictly
*before* the start time. If False, then you will get the last event that occurred in your
stream *on* or *before* the start time.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: None if no event has occurred before the given start time, else returns a dictionary
representing that event.
"""
self.requires_authorization()
query_params: Dict[str, Any] = {
"start_time": start_time,
"include_start": include_start,
}
if q:
query_params["q"] = q
r = self._session.get(
self.api.endpoints[ENDPOINT_STREAMS_PREVIOUS], params=query_params
)
r.raise_for_status()
return r.json()
def events(
self,
start_time: int,
include_start: bool,
end_time: int,
include_end: bool,
q: str = "",
) -> Dict[str, Any]:
"""
Return all events in your stream that occurred between the given start and end times.
Arguments:
- start_time - Time after which you want to query your stream.
- include_start - Whether or not events that occurred exactly at the start_time should be included in the results.
- end_time - Time before which you want to query your stream.
- include_end - Whether or not events that occurred exactly at the end_time should be included in the results.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: A dictionary representing the results of your query.
"""
self.requires_authorization()
query_params: Dict[str, Any] = {
"start_time": start_time,
"include_start": include_start,
"end_time": end_time,
"include_end": include_end,
}
if q:
query_params["q"] = q
r = self._session.get(self.api.endpoints[ENDPOINT_STREAMS], params=query_params)
r.raise_for_status()
return r.json()
def client_from_env() -> Moonstream:
"""

Wyświetl plik

@ -99,9 +99,7 @@ class TestMoonstreamClientFromEnv(unittest.TestCase):
self.assertIsNone(m.requires_authorization())
authorization_header = m._session.headers["Authorization"]
self.assertTrue(authorization_header.startswith("Bearer "))
access_token = authorization_header[len("Bearer ") :]
self.assertEqual(access_token, self.moonstream_access_token)
self.assertEqual(authorization_header, f"Bearer {self.moonstream_access_token}")
class TestMoonstreamEndpoints(unittest.TestCase):
@ -120,6 +118,10 @@ class TestMoonstreamEndpoints(unittest.TestCase):
client.ENDPOINT_TOKEN: f"{self.normalized_url}{client.ENDPOINT_TOKEN}",
client.ENDPOINT_SUBSCRIPTION_TYPES: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTION_TYPES}",
client.ENDPOINT_SUBSCRIPTIONS: f"{self.normalized_url}{client.ENDPOINT_SUBSCRIPTIONS}",
client.ENDPOINT_STREAMS: f"{self.normalized_url}{client.ENDPOINT_STREAMS}",
client.ENDPOINT_STREAMS_LATEST: f"{self.normalized_url}{client.ENDPOINT_STREAMS_LATEST}",
client.ENDPOINT_STREAMS_NEXT: f"{self.normalized_url}{client.ENDPOINT_STREAMS_NEXT}",
client.ENDPOINT_STREAMS_PREVIOUS: f"{self.normalized_url}{client.ENDPOINT_STREAMS_PREVIOUS}",
},
)