kopia lustrzana https://github.com/bugout-dev/moonstream
Stream create with None end time
rodzic
17e92352dd
commit
b0c5d75375
|
@ -405,51 +405,66 @@ class Moonstream:
|
||||||
|
|
||||||
Returns: A dictionary stream representing the results of your query.
|
Returns: A dictionary stream representing the results of your query.
|
||||||
"""
|
"""
|
||||||
shift = 2 * 60 * 60
|
shift = 1 * 60 * 60
|
||||||
|
|
||||||
|
def fetch_events(modified_start_time: int, modified_end_time: int) -> int:
|
||||||
|
# If going to bottom of history, reverse_list after time_range
|
||||||
|
# will be generated
|
||||||
|
reversed_time = False
|
||||||
|
if modified_start_time > modified_end_time:
|
||||||
|
reversed_time = True
|
||||||
|
max_boundary = max(modified_start_time, modified_end_time)
|
||||||
|
min_boundary = min(modified_start_time, modified_end_time)
|
||||||
|
|
||||||
|
time_range_list = []
|
||||||
|
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
|
||||||
|
if max_boundary - min_boundary > shift:
|
||||||
|
for i in range(min_boundary, max_boundary, shift):
|
||||||
|
end_i = i + shift - 1 if i + shift <= max_boundary else max_boundary
|
||||||
|
time_range_list.append({"start_time": i, "end_time": end_i})
|
||||||
|
else:
|
||||||
|
time_range_list.append(
|
||||||
|
{"start_time": min_boundary, "end_time": max_boundary}
|
||||||
|
)
|
||||||
|
if reversed_time:
|
||||||
|
time_range_list.reverse()
|
||||||
|
|
||||||
|
for time_range in time_range_list:
|
||||||
|
r_json = self.events(
|
||||||
|
start_time=time_range["start_time"],
|
||||||
|
end_time=time_range["end_time"],
|
||||||
|
include_start=True,
|
||||||
|
include_end=True,
|
||||||
|
q=q,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield r_json, reversed_time
|
||||||
|
|
||||||
|
time_range_list = time_range_list[:]
|
||||||
|
|
||||||
continues = False
|
|
||||||
if end_time is None:
|
if end_time is None:
|
||||||
continues = True
|
# Update float_start_time after first iteration to last event time
|
||||||
end_time = int(self.server_time())
|
float_start_time = start_time
|
||||||
|
while True:
|
||||||
|
end_time = int(self.server_time())
|
||||||
|
for r_json, reversed_time in fetch_events(float_start_time, end_time):
|
||||||
|
|
||||||
# If going to bottom of history, reverse_list after time_range
|
yield r_json
|
||||||
# will be generated
|
|
||||||
reversed_time = False
|
|
||||||
if start_time > end_time:
|
|
||||||
reversed_time = True
|
|
||||||
max_boundary = max(start_time, end_time)
|
|
||||||
min_boundary = min(start_time, end_time)
|
|
||||||
|
|
||||||
time_range_list = []
|
float_start_time = (
|
||||||
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
|
end_time + 1 if not reversed_time else end_time - 1
|
||||||
if max_boundary - min_boundary > shift:
|
)
|
||||||
for i in range(min_boundary, max_boundary, shift):
|
events = r_json.get("events")
|
||||||
end_i = i + shift - 1 if i + shift <= max_boundary else max_boundary
|
if len(events) == 0:
|
||||||
time_range_list.append({"start_time": i, "end_time": end_i})
|
# if reversed_time:
|
||||||
|
# last_event_time = events[-1].get("event_timestamp")
|
||||||
|
# else:
|
||||||
|
# last_event_time = events[0].get("event_timestamp")
|
||||||
|
# float_start_time = last_event_time
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
else:
|
else:
|
||||||
time_range_list.append(
|
fetch_events(start_time, end_time)
|
||||||
{"start_time": min_boundary, "end_time": max_boundary}
|
|
||||||
)
|
|
||||||
if reversed_time:
|
|
||||||
time_range_list.reverse()
|
|
||||||
|
|
||||||
for time_range in time_range_list:
|
|
||||||
r_json = self.events(
|
|
||||||
start_time=time_range["start_time"],
|
|
||||||
end_time=time_range["end_time"],
|
|
||||||
include_start=True,
|
|
||||||
include_end=True,
|
|
||||||
q=q,
|
|
||||||
)
|
|
||||||
yield r_json
|
|
||||||
|
|
||||||
time_range_list = time_range_list[:]
|
|
||||||
|
|
||||||
events = r_json.get("events")
|
|
||||||
if reversed_time:
|
|
||||||
last_event_time = events[-1].get("event_timestamp")
|
|
||||||
else:
|
|
||||||
last_event_time = events[0].get("event_timestamp")
|
|
||||||
|
|
||||||
|
|
||||||
def client_from_env() -> Moonstream:
|
def client_from_env() -> Moonstream:
|
||||||
|
|
Ładowanie…
Reference in New Issue