From 801e9f0c75d6e7a6c104a14db5a5b1455a107f93 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 25 Nov 2021 12:23:05 +0000 Subject: [PATCH] Working stream creation with howto in README --- clients/python/README.md | 90 +++++++++++++++++++++++++++++ clients/python/moonstream/client.py | 62 ++++++++++++-------- 2 files changed, 127 insertions(+), 25 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 7d68db78..0578a7c1 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -11,3 +11,93 @@ Install using `pip`: ```bash pip install moonstream ``` + +## Usage + +- Source environment variable with access token to Moonstream, you can create one on page https://moonstream.to/account/tokens/ + +```python +access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN") +``` + +- Create an object of Moonstream client and authorize + +```python +mc = Moonstream() +mc.authorize(access_token) +``` + +## create_stream method + +Return a stream of event for time range. + +**From timestamp to None** + +When `end_time` is not set. + +```python +for events in mc.create_stream( + start_time=1637834400, end_time=None, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +In this case we will be receiving events from bottom of history to recent time in next order: + +```python +[1637836177, ..., 1637834440] +[1637837980, ..., 1637836226] +# Until we will get latest event, +# then we will be receiving empty lists +[] +[] +# Until new events will be available +[1637839306, 1637839306, 1637839306, 1637839306] +[] +# Continuing... +``` + +**From timestamp to timestamp, from bottom to top** + +When `start_time` is less then `end_time`. + +```python +for events in mc.create_stream( + start_time=1637839281, end_time=1637830890, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +Stream of event packs will be generating from recent timestamp to older and inner list of transactions for each pack will be in most recent to older event timestamp range: + +```python +[1637839280, ..., 1637838094] +[1637838086, ..., 1637836340] +... +[1637834488, ..., 1637832699] +[1637832676, ..., 1637830903] +``` + +**From timestamp to timestamp, from top to bottom** + +When `start_time` is greater then `end_time`. + +```python +for events in mc.create_stream( + start_time=1637830890, end_time=1637839281, q="type:ethereum_blockchain" +): + event_timestamp_list = [e["event_timestamp"] for e in events["events"]] + print(event_timestamp_list) +``` + +You start receiving list of older events from bottom of history to newest: + +```python +[1637832676, ..., 1637830903] +[1637834488, ..., 1637832699] +... +[1637838086, ..., 1637836340] +[1637839280, ..., 1637838094] +``` diff --git a/clients/python/moonstream/client.py b/clients/python/moonstream/client.py index 224a2b6f..8ab9f5c2 100644 --- a/clients/python/moonstream/client.py +++ b/clients/python/moonstream/client.py @@ -1,8 +1,8 @@ import logging import os -from dataclasses import dataclass, field import time -from typing import Any, Dict, List, Optional +from dataclasses import dataclass, field +from typing import Any, Dict, Generator, List, Optional, Tuple import requests @@ -394,22 +394,25 @@ class Moonstream: start_time: int, end_time: Optional[int] = None, q: str = "", - ) -> Dict[str, Any]: + ) -> Generator[Dict[str, Any], None, None]: """ - Return a stream of event from specific timestamp. + Return a stream of event. Event packs will be generated with 1 hour time range. Arguments: - - start_time - Time from you want to start receiving stream of events. - - end_time - Time until the end of stream, if set to None stream will be going forward. + - start_time - One of time border. + - end_time - Time until the end of stream, if set to None stream will be going forward endlessly. - q - Optional query to filter over your available subscriptions and subscription types. Returns: A dictionary stream representing the results of your query. """ - shift = 1 * 60 * 60 + shift_two_hours = 2 * 60 * 60 # 2 hours + shift_half_hour = 1 * 30 * 30 # 30 min - def fetch_events(modified_start_time: int, modified_end_time: int) -> int: - # If going to bottom of history, reverse_list after time_range - # will be generated + def fetch_events( + modified_start_time: int, modified_end_time: int + ) -> Generator[Tuple[Dict[str, Any], bool], None, None]: + # If it is going from top to bottom in history, + # then time_range will be reversed reversed_time = False if modified_start_time > modified_end_time: reversed_time = True @@ -418,9 +421,13 @@ class Moonstream: time_range_list = [] # 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}] - if max_boundary - min_boundary > shift: - for i in range(min_boundary, max_boundary, shift): - end_i = i + shift - 1 if i + shift <= max_boundary else max_boundary + if max_boundary - min_boundary > shift_half_hour: + for i in range(min_boundary, max_boundary, shift_half_hour): + end_i = ( + i + shift_half_hour - 1 + if i + shift_half_hour <= max_boundary + else max_boundary + ) time_range_list.append({"start_time": i, "end_time": end_i}) else: time_range_list.append( @@ -443,28 +450,33 @@ class Moonstream: time_range_list = time_range_list[:] if end_time is None: - # Update float_start_time after first iteration to last event time float_start_time = start_time + while True: end_time = int(self.server_time()) + # If time range is greater then 2 hours, + # shift float_start time close to end_time to prevent stream block + if end_time - float_start_time > shift_two_hours: + float_start_time = shift_two_hours for r_json, reversed_time in fetch_events(float_start_time, end_time): yield r_json - float_start_time = ( - end_time + 1 if not reversed_time else end_time - 1 - ) - events = r_json.get("events") - if len(events) == 0: - # if reversed_time: - # last_event_time = events[-1].get("event_timestamp") - # else: - # last_event_time = events[0].get("event_timestamp") - # float_start_time = last_event_time + events = r_json.get("events", []) + if len(events) > 0: + # Updating float_start_time after first iteration to last event time + if reversed_time: + float_start_time = events[-1].get("event_timestamp") - 1 + else: + float_start_time = events[0].get("event_timestamp") + 1 + else: + # If there are no events in response, wait + # until new will be added time.sleep(5) else: - fetch_events(start_time, end_time) + for r_json, reversed_time in fetch_events(start_time, end_time): + yield r_json def client_from_env() -> Moonstream: