Stream creation from top to bottom without None

pull/457/head
kompotkot 2021-11-24 14:29:40 +00:00
rodzic 3daf597596
commit 17e92352dd
1 zmienionych plików z 66 dodań i 1 usunięć

Wyświetl plik

@ -1,6 +1,7 @@
import logging
import os
from dataclasses import dataclass, field
import time
from typing import Any, Dict, List, Optional
import requests
@ -98,7 +99,9 @@ class Moonstream:
self.timeout = timeout
self._session = requests.Session()
self._session.headers.update(
{"User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})"}
{
"User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})"
}
)
def ping(self) -> Dict[str, Any]:
@ -386,6 +389,68 @@ class Moonstream:
r.raise_for_status()
return r.json()
def create_stream(
self,
start_time: int,
end_time: Optional[int] = None,
q: str = "",
) -> Dict[str, Any]:
"""
Return a stream of event from specific timestamp.
Arguments:
- start_time - Time from you want to start receiving stream of events.
- end_time - Time until the end of stream, if set to None stream will be going forward.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: A dictionary stream representing the results of your query.
"""
shift = 2 * 60 * 60
continues = False
if end_time is None:
continues = True
end_time = int(self.server_time())
# If going to bottom of history, reverse_list after time_range
# will be generated
reversed_time = False
if start_time > end_time:
reversed_time = True
max_boundary = max(start_time, end_time)
min_boundary = min(start_time, end_time)
time_range_list = []
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
if max_boundary - min_boundary > shift:
for i in range(min_boundary, max_boundary, shift):
end_i = i + shift - 1 if i + shift <= max_boundary else max_boundary
time_range_list.append({"start_time": i, "end_time": end_i})
else:
time_range_list.append(
{"start_time": min_boundary, "end_time": max_boundary}
)
if reversed_time:
time_range_list.reverse()
for time_range in time_range_list:
r_json = self.events(
start_time=time_range["start_time"],
end_time=time_range["end_time"],
include_start=True,
include_end=True,
q=q,
)
yield r_json
time_range_list = time_range_list[:]
events = r_json.get("events")
if reversed_time:
last_event_time = events[-1].get("event_timestamp")
else:
last_event_time = events[0].get("event_timestamp")
def client_from_env() -> Moonstream:
"""