Merge branch 'main' into add-events-provider

pull/475/head
Andrey Dolgolev 2021-12-02 00:32:18 +02:00
commit 9e00e83a64
17 zmienionych plików z 331 dodań i 166 usunięć

Wyświetl plik

@ -31,8 +31,8 @@ jobs:
- name: Check that versions are synchronized
working-directory: ./clients/python
run: |
CLIENT_VERSION=$(python -c "from moonstream.client import CLIENT_VERSION; print(CLIENT_VERSION)")
MOONSTREAM_CLIENT_VERSION=$(python -c "from moonstream.client import MOONSTREAM_CLIENT_VERSION; print(MOONSTREAM_CLIENT_VERSION)")
SETUP_PY_VERSION=$(python setup.py --version)
echo "Client version: $CLIENT_VERSION"
echo "Client version: $MOONSTREAM_CLIENT_VERSION"
echo "setup.py version: $SETUP_PY_VERSION"
test "$CLIENT_VERSION" = "$SETUP_PY_VERSION"
test "$MOONSTREAM_CLIENT_VERSION" = "$SETUP_PY_VERSION"

Wyświetl plik

@ -0,0 +1,3 @@
[settings]
profile = black
multi_line_output = 3

Wyświetl plik

@ -6,7 +6,6 @@ import uuid
import boto3 # type: ignore
from bugout.data import BugoutSearchResults
from bugout.exceptions import BugoutResponseException
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore

Wyświetl plik

@ -11,7 +11,6 @@ from fastapi.middleware.cors import CORSMiddleware
from . import actions
from . import data
from .routes.address_info import router as addressinfo_router
from .routes.nft import router as nft_router
from .routes.streams import router as streams_router
from .routes.subscriptions import router as subscriptions_router
from .routes.txinfo import router as txinfo_router
@ -32,7 +31,6 @@ tags_metadata = [
"name": "labels",
"description": "Labels for transactions, addresses with additional information.",
},
{"name": "nft", "description": "NFT market summaries."},
{"name": "streams", "description": "Operations with data streams and filters."},
{"name": "subscriptions", "description": "Operations with user subscriptions."},
{"name": "time", "description": "Server timestamp endpoints."},
@ -124,7 +122,6 @@ async def status_handler() -> data.StatusResponse:
app.include_router(addressinfo_router)
app.include_router(nft_router)
app.include_router(streams_router)
app.include_router(subscriptions_router)
app.include_router(txinfo_router)

Wyświetl plik

@ -1,13 +1,12 @@
"""
Pydantic schemas for the Moonstream HTTP API
"""
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict, Any, Union
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field
from datetime import datetime
USER_ONBOARDING_STATE = "onboarding_state"
@ -154,6 +153,7 @@ class StreamBoundary(BaseModel):
end_time: Optional[int] = None
include_start: bool = False
include_end: bool = False
reversed_time: bool = False
class Event(BaseModel):
@ -229,6 +229,10 @@ class OnboardingState(BaseModel):
steps: Dict[str, int]
class SubdcriptionsAbiResponse(BaseModel):
url: str
class DashboardMeta(BaseModel):
subscription_id: UUID
generic: Optional[List[Dict[str, str]]]

Wyświetl plik

@ -24,8 +24,8 @@ if the order does not matter and you would rather emphasize speed. Only availabl
lists of events. (Default: True)
"""
from concurrent.futures import Future, ThreadPoolExecutor
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple
from bugout.app import Bugout
@ -79,8 +79,14 @@ def get_events(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.get_events,
db_session,
@ -106,9 +112,15 @@ def get_events(
else:
raise ReceivingEventsException(e)
stream_boundary = [boundary for boundary, _ in results.values()][0]
events = [event for _, event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
# If stream_boundary time was reversed, so do not reverse by timestamp,
# it is already in correct oreder
events.sort(
key=lambda event: event.event_timestamp,
reverse=not stream_boundary.reversed_time,
)
return (stream_boundary, events)
@ -138,9 +150,14 @@ def latest_events(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers.items():
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.latest_events,
db_session,
@ -165,7 +182,6 @@ def latest_events(
)
else:
raise ReceivingEventsException(e)
events = [event for event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
@ -193,7 +209,14 @@ def next_event(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.next_event,
db_session,
@ -249,8 +272,14 @@ def previous_event(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.previous_event,
db_session,

Wyświetl plik

@ -327,34 +327,6 @@ class TXPoolProvider(BugoutEventProvider):
return subscriptions_filters
class PublicDataProvider(BugoutEventProvider):
def __init__(
self,
event_type: str,
description: str,
default_time_interval_seconds: int,
estimated_events_per_time_interval: float,
tags: Optional[List[str]] = None,
batch_size: int = 100,
timeout: float = 30.0,
):
super().__init__(
event_type=event_type,
description=description,
default_time_interval_seconds=default_time_interval_seconds,
estimated_events_per_time_interval=estimated_events_per_time_interval,
tags=tags,
batch_size=batch_size,
timeout=timeout,
)
def parse_filters(
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
) -> Optional[List[str]]:
return []
whalewatch_description = """Event provider for Ethereum whale watch.
Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories:
@ -364,7 +336,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour
4. Amount (in WEI) received
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
ethereum_whalewatch_provider = PublicDataProvider(
whalewatch_provider = BugoutEventProvider(
event_type="ethereum_whalewatch",
description=whalewatch_description,
default_time_interval_seconds=310,
@ -372,7 +344,7 @@ ethereum_whalewatch_provider = PublicDataProvider(
tags=["crawl_type:ethereum_trending"],
)
polygon_whalewatch_provider = PublicDataProvider(
polygon_whalewatch_provider = BugoutEventProvider(
event_type="polygon_whalewatch",
description=whalewatch_description,
default_time_interval_seconds=310,
@ -392,29 +364,3 @@ ethereum_txpool_provider = TXPoolProvider(
estimated_events_per_time_interval=50,
tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"],
)
polygon_txpool_provider = TXPoolProvider(
event_type="polygon_txpool",
description=ethereum_txpool_description,
default_time_interval_seconds=5,
estimated_events_per_time_interval=50,
tags=[f"client:polygon_HUMBUG_CLIENT_ID"],
)
nft_summary_description = """Event provider for NFT market summaries.
This provider periodically generates NFT market summaries for the last hour of market activity.
Currently, it summarizes the activities on the following NFT markets:
1. The Ethereum market
This provider is currently not accessible for subscription. The data from this provider is publicly
available at the /nft endpoint."""
nft_summary_provider = PublicDataProvider(
event_type="nft_summary",
description=nft_summary_description,
# 40 blocks per summary, 15 seconds per block + 2 seconds wiggle room.
default_time_interval_seconds=40 * 17,
estimated_events_per_time_interval=1,
tags=["crawl_type:nft_ethereum"],
)

Wyświetl plik

@ -78,7 +78,9 @@ class TransactionsProvider:
return False, errors
return True, errors
def stream_boundary_validator(self, stream_boundary: data.StreamBoundary) -> None:
def stream_boundary_validator(
self, stream_boundary: data.StreamBoundary
) -> data.StreamBoundary:
"""
Stream boundary validator for the transactions provider.
@ -87,9 +89,10 @@ class TransactionsProvider:
Raises an error for invalid stream boundaries, else returns None.
"""
valid_period_seconds = self.valid_period_seconds
validate_stream_boundary(
_, stream_boundary = validate_stream_boundary(
stream_boundary, valid_period_seconds, raise_when_invalid=True
)
return stream_boundary
def default_filters(self, subscriptions: List[BugoutResource]) -> Filters:
"""
@ -309,7 +312,7 @@ class TransactionsProvider:
If the query does not require any data from this provider, returns None.
"""
self.stream_boundary_validator(stream_boundary)
stream_boundary = self.stream_boundary_validator(stream_boundary)
parsed_filters = self.parse_filters(query, user_subscriptions)
if parsed_filters is None:

Wyświetl plik

@ -1,59 +0,0 @@
"""
Moonstream's /nft endpoints.
These endpoints provide public access to NFT market summaries. No authentication required.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb import db
from sqlalchemy.orm import Session
from .. import data
from ..providers.bugout import nft_summary_provider
from ..settings import (
bugout_client,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
)
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/nft")
@router.get("/", tags=["streams"], response_model=data.GetEventsResponse)
async def stream_handler(
start_time: int = Query(0),
end_time: Optional[int] = Query(None),
include_start: bool = Query(False),
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
) -> data.GetEventsResponse:
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
include_start=include_start,
include_end=include_end,
)
result = nft_summary_provider.get_events(
db_session=db_session,
bugout_client=bugout_client,
data_journal_id=MOONSTREAM_DATA_JOURNAL_ID,
data_access_token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary=stream_boundary,
user_subscriptions={nft_summary_provider.event_type: []},
query=StreamQuery(subscription_types=[nft_summary_provider.event_type]),
)
if result is None:
return data.GetEventsResponse(stream_boundary=stream_boundary, events=[])
provider_stream_boundary, events = result
return data.GetEventsResponse(
stream_boundary=provider_stream_boundary, events=events
)

Wyświetl plik

@ -314,6 +314,51 @@ async def update_subscriptions_handler(
)
@router.get(
"/{subscription_id}/abi",
tags=["subscriptions"],
response_model=data.SubdcriptionsAbiResponse,
)
async def get_subscription_abi_handler(
request: Request,
subscription_id: str,
) -> data.SubdcriptionsAbiResponse:
token = request.state.token
try:
subscription_resource: BugoutResource = bc.get_resource(
token=token,
resource_id=subscription_id,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if subscription_resource.resource_data["abi"] is None:
raise MoonstreamHTTPException(
status_code=404,
detail="Subscription abi not exists.",
)
s3_client = boto3.client("s3")
result_key = f"{subscription_resource.resource_data['s3_path']}"
presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": subscription_resource.resource_data["bucket"],
"Key": result_key,
},
ExpiresIn=300,
HttpMethod="GET",
)
return data.SubdcriptionsAbiResponse(url=presigned_url)
@router.get(
"/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse
)

Wyświetl plik

@ -2,6 +2,7 @@
Utilities to work with stream boundaries.
"""
import time
from typing import Tuple
from .data import StreamBoundary
@ -16,7 +17,7 @@ def validate_stream_boundary(
stream_boundary: StreamBoundary,
time_difference_seconds: int,
raise_when_invalid: bool = False,
) -> bool:
) -> Tuple[bool, StreamBoundary]:
"""
This function can be used by event providers to check if a stream boundary is valid according to their
requirements.
@ -33,6 +34,16 @@ def validate_stream_boundary(
f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}"
)
else:
return False
return False, stream_boundary
return True
# If required reversed time stream of events
if start_time > end_time:
include_start = stream_boundary.include_start
include_end = stream_boundary.include_end
stream_boundary.start_time = end_time
stream_boundary.end_time = start_time
stream_boundary.include_start = include_end
stream_boundary.include_end = include_start
stream_boundary.reversed_time = True
return True, stream_boundary

Wyświetl plik

@ -3,8 +3,8 @@ Tests for stream boundary utilities.
"""
import unittest
from .data import StreamBoundary
from . import stream_boundaries
from .data import StreamBoundary
class TestValidateStreamBoundary(unittest.TestCase):
@ -12,45 +12,44 @@ class TestValidateStreamBoundary(unittest.TestCase):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertTrue(
stream_boundaries.validate_stream_boundary(
stream_boundary, 10, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 10, raise_when_invalid=False
)
self.assertTrue(valid)
def test_invalid_stream_boundary(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
self.assertFalse(valid)
def test_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
valid
def test_unconstrainted_invalid_stream_boundary(self):
stream_boundary = StreamBoundary()
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
self.assertFalse(valid)
def test_unconstrained_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary()
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
valid
if __name__ == "__main__":

Wyświetl plik

@ -0,0 +1,3 @@
[settings]
profile = black
multi_line_output = 3

Wyświetl plik

@ -11,3 +11,93 @@ Install using `pip`:
```bash
pip install moonstream
```
## Usage
- Source environment variable with access token to Moonstream, you can create one on page https://moonstream.to/account/tokens/
```python
access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN")
```
- Create an object of Moonstream client and authorize
```python
mc = Moonstream()
mc.authorize(access_token)
```
## create_stream method
Return a stream of event for time range.
**From timestamp to None, from bottom to top**
When `end_time` is not set.
```python
for events in mc.create_stream(
start_time=1637834400, end_time=None, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
In this case we will be receiving events from bottom of history to recent time in next order:
```python
[1637836177, ..., 1637834440]
[1637837980, ..., 1637836226]
# Until we will get latest event,
# then we will be receiving empty lists
[]
[]
# Until new events will be available
[1637839306, 1637839306, 1637839306, 1637839306]
[]
# Continuing...
```
**From timestamp to timestamp, from top to bottom**
When `start_time` is greater then `end_time`.
```python
for events in mc.create_stream(
start_time=1637839281, end_time=1637830890, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
Stream of event packs will be generating from recent timestamp to older and inner list of transactions for each pack will be in most recent to older event timestamp range:
```python
[1637839280, ..., 1637838094]
[1637838086, ..., 1637836340]
...
[1637834488, ..., 1637832699]
[1637832676, ..., 1637830903]
```
**From timestamp to timestamp, from bottom to top**
When `start_time` is less then `end_time`.
```python
for events in mc.create_stream(
start_time=1637830890, end_time=1637839281, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
You start receiving list of older events from bottom of history to newest:
```python
[1637832676, ..., 1637830903]
[1637834488, ..., 1637832699]
...
[1637838086, ..., 1637836340]
[1637839280, ..., 1637838094]
```

Wyświetl plik

@ -1,20 +1,19 @@
from dataclasses import dataclass, field
import logging
import os
from typing import Any, Dict, List, Optional
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, List, Optional, Tuple
import requests
from .version import MOONSTREAM_CLIENT_VERSION
logger = logging.getLogger(__name__)
log_level = logging.INFO
if os.environ.get("DEBUG", "").lower() in ["true", "1"]:
log_level = logging.DEBUG
logger.setLevel(log_level)
# Keep this synchronized with the version in setup.py
CLIENT_VERSION = "0.0.2"
ENDPOINT_PING = "/ping"
ENDPOINT_VERSION = "/version"
ENDPOINT_NOW = "/now"
@ -100,7 +99,9 @@ class Moonstream:
self.timeout = timeout
self._session = requests.Session()
self._session.headers.update(
{"User-Agent": f"Moonstream Python client (version {CLIENT_VERSION})"}
{
"User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})"
}
)
def ping(self) -> Dict[str, Any]:
@ -388,6 +389,96 @@ class Moonstream:
r.raise_for_status()
return r.json()
def create_stream(
self,
start_time: int,
end_time: Optional[int] = None,
q: str = "",
) -> Generator[Dict[str, Any], None, None]:
"""
Return a stream of event. Event packs will be generated with 1 hour time range.
Arguments:
- start_time - One of time border.
- end_time - Time until the end of stream, if set to None stream will be going forward endlessly.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: A dictionary stream representing the results of your query.
"""
# TODO(kompotkot): Add tests
shift_two_hours = 2 * 60 * 60 # 2 hours
shift_half_hour = 1 * 30 * 30 # 30 min
def fetch_events(
modified_start_time: int, modified_end_time: int
) -> Generator[Tuple[Dict[str, Any], bool], None, None]:
# If it is going from top to bottom in history,
# then time_range will be reversed
reversed_time = False
if modified_start_time > modified_end_time:
reversed_time = True
max_boundary = max(modified_start_time, modified_end_time)
min_boundary = min(modified_start_time, modified_end_time)
time_range_list = []
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
if max_boundary - min_boundary > shift_half_hour:
for i in range(min_boundary, max_boundary, shift_half_hour):
end_i = (
i + shift_half_hour - 1
if i + shift_half_hour <= max_boundary
else max_boundary
)
time_range_list.append({"start_time": i, "end_time": end_i})
else:
time_range_list.append(
{"start_time": min_boundary, "end_time": max_boundary}
)
if reversed_time:
time_range_list.reverse()
for time_range in time_range_list:
r_json = self.events(
start_time=time_range["start_time"],
end_time=time_range["end_time"],
include_start=True,
include_end=True,
q=q,
)
yield r_json, reversed_time
time_range_list = time_range_list[:]
if end_time is None:
float_start_time = start_time
while True:
end_time = int(self.server_time())
# If time range is greater then 2 hours,
# shift float_start time close to end_time to prevent stream block
if end_time - float_start_time > shift_two_hours:
float_start_time = shift_two_hours
for r_json, reversed_time in fetch_events(float_start_time, end_time):
yield r_json
events = r_json.get("events", [])
if len(events) > 0:
# Updating float_start_time after first iteration to last event time
if reversed_time:
float_start_time = events[-1].get("event_timestamp") - 1
else:
float_start_time = events[0].get("event_timestamp") + 1
else:
# If there are no events in response, wait
# until new will be added
time.sleep(5)
else:
for r_json, reversed_time in fetch_events(start_time, end_time):
yield r_json
def client_from_env() -> Moonstream:
"""

Wyświetl plik

@ -0,0 +1 @@
MOONSTREAM_CLIENT_VERSION = "0.0.3"

Wyświetl plik

@ -1,12 +1,14 @@
from setuptools import find_packages, setup
from moonstream.version import MOONSTREAM_CLIENT_VERSION
long_description = ""
with open("README.md") as ifp:
long_description = ifp.read()
setup(
name="moonstream",
version="0.0.2",
version=MOONSTREAM_CLIENT_VERSION,
packages=find_packages(),
package_data={"moonstream": ["py.typed"]},
install_requires=["requests", "dataclasses; python_version=='3.6'"],
@ -14,6 +16,7 @@ setup(
"dev": [
"black",
"mypy",
"isort",
"wheel",
"types-requests",
"types-dataclasses",