Merge branch 'main' into customized-dashboard

pull/477/head
Tim Pechersky 2021-12-01 15:28:58 +00:00
commit 098f8b6b43
35 zmienionych plików z 805 dodań i 502 usunięć

Wyświetl plik

@ -31,8 +31,8 @@ jobs:
- name: Check that versions are synchronized
working-directory: ./clients/python
run: |
CLIENT_VERSION=$(python -c "from moonstream.client import CLIENT_VERSION; print(CLIENT_VERSION)")
MOONSTREAM_CLIENT_VERSION=$(python -c "from moonstream.client import MOONSTREAM_CLIENT_VERSION; print(MOONSTREAM_CLIENT_VERSION)")
SETUP_PY_VERSION=$(python setup.py --version)
echo "Client version: $CLIENT_VERSION"
echo "Client version: $MOONSTREAM_CLIENT_VERSION"
echo "setup.py version: $SETUP_PY_VERSION"
test "$CLIENT_VERSION" = "$SETUP_PY_VERSION"
test "$MOONSTREAM_CLIENT_VERSION" = "$SETUP_PY_VERSION"

Wyświetl plik

@ -0,0 +1,3 @@
[settings]
profile = black
multi_line_output = 3

Wyświetl plik

@ -6,7 +6,6 @@ import uuid
import boto3 # type: ignore
from bugout.data import BugoutSearchResults
from bugout.exceptions import BugoutResponseException
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore

Wyświetl plik

@ -11,7 +11,6 @@ from fastapi.middleware.cors import CORSMiddleware
from . import actions
from . import data
from .routes.address_info import router as addressinfo_router
from .routes.nft import router as nft_router
from .routes.streams import router as streams_router
from .routes.subscriptions import router as subscriptions_router
from .routes.txinfo import router as txinfo_router
@ -32,7 +31,6 @@ tags_metadata = [
"name": "labels",
"description": "Labels for transactions, addresses with additional information.",
},
{"name": "nft", "description": "NFT market summaries."},
{"name": "streams", "description": "Operations with data streams and filters."},
{"name": "subscriptions", "description": "Operations with user subscriptions."},
{"name": "time", "description": "Server timestamp endpoints."},
@ -124,7 +122,6 @@ async def status_handler() -> data.StatusResponse:
app.include_router(addressinfo_router)
app.include_router(nft_router)
app.include_router(streams_router)
app.include_router(subscriptions_router)
app.include_router(txinfo_router)

Wyświetl plik

@ -1,13 +1,12 @@
"""
Pydantic schemas for the Moonstream HTTP API
"""
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict, Any, Union
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field
from datetime import datetime
USER_ONBOARDING_STATE = "onboarding_state"
@ -154,6 +153,7 @@ class StreamBoundary(BaseModel):
end_time: Optional[int] = None
include_start: bool = False
include_end: bool = False
reversed_time: bool = False
class Event(BaseModel):
@ -230,6 +230,10 @@ class OnboardingState(BaseModel):
steps: Dict[str, int]
class SubdcriptionsAbiResponse(BaseModel):
url: str
class DashboardMeta(BaseModel):
subscription_id: UUID
generic: Optional[List[Dict[str, str]]]
@ -249,3 +253,8 @@ class DashboardResource(BaseModel):
class DashboardCreate(BaseModel):
name: str
subscriptions: List[DashboardMeta]
class DashboardUpdate(BaseModel):
name: Optional[str]
subscriptions: List[DashboardMeta] = Field(default_factory=list)

Wyświetl plik

@ -24,17 +24,17 @@ if the order does not matter and you would rather emphasize speed. Only availabl
lists of events. (Default: True)
"""
from concurrent.futures import Future, ThreadPoolExecutor
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple
from bugout.app import Bugout
from bugout.data import BugoutResource
from sqlalchemy.orm import Session
from . import bugout, ethereum_blockchain
from .. import data
from ..stream_queries import StreamQuery
from . import bugout, ethereum_blockchain
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
@ -74,7 +74,14 @@ def get_events(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.get_events,
db_session,
@ -100,9 +107,15 @@ def get_events(
else:
raise ReceivingEventsException(e)
stream_boundary = [boundary for boundary, _ in results.values()][0]
events = [event for _, event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
# If stream_boundary time was reversed, so do not reverse by timestamp,
# it is already in correct oreder
events.sort(
key=lambda event: event.event_timestamp,
reverse=not stream_boundary.reversed_time,
)
return (stream_boundary, events)
@ -132,7 +145,14 @@ def latest_events(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.latest_events,
db_session,
@ -157,7 +177,6 @@ def latest_events(
)
else:
raise ReceivingEventsException(e)
events = [event for event_list in results.values() for event in event_list]
if sort_events:
events.sort(key=lambda event: event.event_timestamp, reverse=True)
@ -185,7 +204,14 @@ def next_event(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.next_event,
db_session,
@ -241,7 +267,14 @@ def previous_event(
with ThreadPoolExecutor(
max_workers=max_threads, thread_name_prefix="event_providers_"
) as executor:
for provider_name, provider in event_providers.items():
# Filter our not queried event_types
event_providers_filtered = {
key: value
for (key, value) in event_providers.items()
if value.event_type in query.subscription_types
}
for provider_name, provider in event_providers_filtered.items():
futures[provider_name] = executor.submit(
provider.previous_event,
db_session,

Wyświetl plik

@ -327,34 +327,6 @@ class EthereumTXPoolProvider(BugoutEventProvider):
return subscriptions_filters
class PublicDataProvider(BugoutEventProvider):
def __init__(
self,
event_type: str,
description: str,
default_time_interval_seconds: int,
estimated_events_per_time_interval: float,
tags: Optional[List[str]] = None,
batch_size: int = 100,
timeout: float = 30.0,
):
super().__init__(
event_type=event_type,
description=description,
default_time_interval_seconds=default_time_interval_seconds,
estimated_events_per_time_interval=estimated_events_per_time_interval,
tags=tags,
batch_size=batch_size,
timeout=timeout,
)
def parse_filters(
self, query: StreamQuery, user_subscriptions: Dict[str, List[BugoutResource]]
) -> Optional[List[str]]:
return []
whalewatch_description = """Event provider for Ethereum whale watch.
Shows the top 10 addresses active on the Ethereum blockchain over the last hour in the following categories:
@ -364,7 +336,7 @@ Shows the top 10 addresses active on the Ethereum blockchain over the last hour
4. Amount (in WEI) received
To restrict your queries to this provider, add a filter of \"type:ethereum_whalewatch\" to your query (query parameter: \"q\") on the /streams endpoint."""
whalewatch_provider = PublicDataProvider(
whalewatch_provider = BugoutEventProvider(
event_type="ethereum_whalewatch",
description=whalewatch_description,
default_time_interval_seconds=310,
@ -384,21 +356,3 @@ ethereum_txpool_provider = EthereumTXPoolProvider(
estimated_events_per_time_interval=50,
tags=[f"client:{ETHTXPOOL_HUMBUG_CLIENT_ID}"],
)
nft_summary_description = """Event provider for NFT market summaries.
This provider periodically generates NFT market summaries for the last hour of market activity.
Currently, it summarizes the activities on the following NFT markets:
1. The Ethereum market
This provider is currently not accessible for subscription. The data from this provider is publicly
available at the /nft endpoint."""
nft_summary_provider = PublicDataProvider(
event_type="nft_summary",
description=nft_summary_description,
# 40 blocks per summary, 15 seconds per block + 2 seconds wiggle room.
default_time_interval_seconds=40 * 17,
estimated_events_per_time_interval=1,
tags=["crawl_type:nft_ethereum"],
)

Wyświetl plik

@ -1,25 +1,18 @@
from dataclasses import dataclass, field
import logging
from typing import cast, Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout
from bugout.data import BugoutResource
from moonstreamdb.models import (
EthereumBlock,
EthereumTransaction,
EthereumLabel,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query
from moonstreamdb.models import EthereumBlock, EthereumLabel, EthereumTransaction
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.functions import user
from .. import data
from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
@ -59,7 +52,9 @@ def validate_subscription(
return True, errors
def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None:
def stream_boundary_validator(
stream_boundary: data.StreamBoundary,
) -> data.StreamBoundary:
"""
Stream boundary validator for the ethereum_blockchain event provider.
@ -68,9 +63,10 @@ def stream_boundary_validator(stream_boundary: data.StreamBoundary) -> None:
Raises an error for invalid stream boundaries, else returns None.
"""
valid_period_seconds = 2 * 60 * 60
validate_stream_boundary(
_, stream_boundary = validate_stream_boundary(
stream_boundary, valid_period_seconds, raise_when_invalid=True
)
return stream_boundary
@dataclass
@ -298,7 +294,7 @@ def get_events(
If the query does not require any data from this provider, returns None.
"""
stream_boundary_validator(stream_boundary)
stream_boundary = stream_boundary_validator(stream_boundary)
parsed_filters = parse_filters(query, user_subscriptions)
if parsed_filters is None:

Wyświetl plik

@ -7,7 +7,7 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Request, Query
from fastapi import APIRouter, Request, Query, Body
from .. import actions
from .. import data
@ -41,7 +41,7 @@ blockchain_by_subscription_id = {
@router.post("/", tags=["dashboards"], response_model=BugoutResource)
async def add_dashboard_handler(
request: Request, dashboard: data.DashboardCreate
request: Request, dashboard: data.DashboardCreate = Body(...)
) -> BugoutResource:
"""
Add subscription to blockchain stream data for user.
@ -231,10 +231,7 @@ async def get_dashboard_handler(
@router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def update_dashboard_handler(
request: Request,
dashboard_id: str,
name: Optional[str],
subscriptions: List[data.DashboardMeta],
request: Request, dashboard_id: str, dashboard: data.DashboardUpdate = Body(...)
) -> BugoutResource:
"""
Update dashboards mainly fully overwrite name and subscription metadata
@ -244,7 +241,7 @@ async def update_dashboard_handler(
user = request.state.user
dashboard_subscriptions = subscriptions
dashboard_subscriptions = dashboard.subscriptions
params = {
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
@ -277,7 +274,7 @@ async def update_dashboard_handler(
"bucket"
]
abi_path = available_subscriptions[dashboard_subscription.subscription_id][
"abi_path"
"s3_path"
]
if bucket is None or abi_path is None:
@ -306,8 +303,7 @@ async def update_dashboard_handler(
internal_error=e,
detail=f"We can't access the abi for subscription with id:{dashboard_subscription.subscription_id}.",
)
abi = data.DashboardMeta(**response["Body"].read().decode("utf-8"))
abi = json.loads(response["Body"].read())
actions.dashboards_abi_validation(
dashboard_subscription, abi, s3_path=s3_path
@ -321,23 +317,25 @@ async def update_dashboard_handler(
dashboard_resource: Dict[str, Any] = {}
if subscriptions:
if dashboard_subscriptions:
dashboard_resource["subscriptions"] = subscriptions
dashboard_resource["dashboard_subscriptions"] = json.loads(dashboard.json())[
"subscriptions"
]
if name is not None:
dashboard_resource["name"] = name
if dashboard.name is not None:
dashboard_resource["name"] = dashboard.name
try:
resource: BugoutResource = bc.update_resource(
token=token,
resource_id=dashboard_id,
resource_data=dashboard_resource,
resource_data=data.SubscriptionUpdate(update=dashboard_resource).dict(),
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
logger.error(f"Error updating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return resource
@ -411,12 +409,11 @@ async def get_dashboard_data_links_handler(
for subscription in dashboard_subscriptions:
hash = subscription.resource_data["abi_hash"]
available_timescales = [timescale.value for timescale in data.TimeScale]
stats[subscription.id] = {}
for timescale in available_timescales:
try:
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={

Wyświetl plik

@ -1,59 +0,0 @@
"""
Moonstream's /nft endpoints.
These endpoints provide public access to NFT market summaries. No authentication required.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb import db
from sqlalchemy.orm import Session
from .. import data
from ..providers.bugout import nft_summary_provider
from ..settings import (
bugout_client,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
)
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/nft")
@router.get("/", tags=["streams"], response_model=data.GetEventsResponse)
async def stream_handler(
start_time: int = Query(0),
end_time: Optional[int] = Query(None),
include_start: bool = Query(False),
include_end: bool = Query(False),
db_session: Session = Depends(db.yield_db_session),
) -> data.GetEventsResponse:
stream_boundary = data.StreamBoundary(
start_time=start_time,
end_time=end_time,
include_start=include_start,
include_end=include_end,
)
result = nft_summary_provider.get_events(
db_session=db_session,
bugout_client=bugout_client,
data_journal_id=MOONSTREAM_DATA_JOURNAL_ID,
data_access_token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
stream_boundary=stream_boundary,
user_subscriptions={nft_summary_provider.event_type: []},
query=StreamQuery(subscription_types=[nft_summary_provider.event_type]),
)
if result is None:
return data.GetEventsResponse(stream_boundary=stream_boundary, events=[])
provider_stream_boundary, events = result
return data.GetEventsResponse(
stream_boundary=provider_stream_boundary, events=events
)

Wyświetl plik

@ -314,6 +314,51 @@ async def update_subscriptions_handler(
)
@router.get(
"/{subscription_id}/abi",
tags=["subscriptions"],
response_model=data.SubdcriptionsAbiResponse,
)
async def get_subscription_abi_handler(
request: Request,
subscription_id: str,
) -> data.SubdcriptionsAbiResponse:
token = request.state.token
try:
subscription_resource: BugoutResource = bc.get_resource(
token=token,
resource_id=subscription_id,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if subscription_resource.resource_data["abi"] is None:
raise MoonstreamHTTPException(
status_code=404,
detail="Subscription abi not exists.",
)
s3_client = boto3.client("s3")
result_key = f"{subscription_resource.resource_data['s3_path']}"
presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": subscription_resource.resource_data["bucket"],
"Key": result_key,
},
ExpiresIn=300,
HttpMethod="GET",
)
return data.SubdcriptionsAbiResponse(url=presigned_url)
@router.get(
"/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse
)

Wyświetl plik

@ -2,6 +2,7 @@
Utilities to work with stream boundaries.
"""
import time
from typing import Tuple
from .data import StreamBoundary
@ -16,7 +17,7 @@ def validate_stream_boundary(
stream_boundary: StreamBoundary,
time_difference_seconds: int,
raise_when_invalid: bool = False,
) -> bool:
) -> Tuple[bool, StreamBoundary]:
"""
This function can be used by event providers to check if a stream boundary is valid according to their
requirements.
@ -33,6 +34,16 @@ def validate_stream_boundary(
f"Stream boundary start and end times must not differ by more than {time_difference_seconds} seconds:\n{stream_boundary.json()}"
)
else:
return False
return False, stream_boundary
return True
# If required reversed time stream of events
if start_time > end_time:
include_start = stream_boundary.include_start
include_end = stream_boundary.include_end
stream_boundary.start_time = end_time
stream_boundary.end_time = start_time
stream_boundary.include_start = include_end
stream_boundary.include_end = include_start
stream_boundary.reversed_time = True
return True, stream_boundary

Wyświetl plik

@ -3,8 +3,8 @@ Tests for stream boundary utilities.
"""
import unittest
from .data import StreamBoundary
from . import stream_boundaries
from .data import StreamBoundary
class TestValidateStreamBoundary(unittest.TestCase):
@ -12,45 +12,44 @@ class TestValidateStreamBoundary(unittest.TestCase):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertTrue(
stream_boundaries.validate_stream_boundary(
stream_boundary, 10, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 10, raise_when_invalid=False
)
self.assertTrue(valid)
def test_invalid_stream_boundary(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
self.assertFalse(valid)
def test_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary(
start_time=1, end_time=5, include_start=True, include_end=True
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
valid
def test_unconstrainted_invalid_stream_boundary(self):
stream_boundary = StreamBoundary()
self.assertFalse(
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=False
)
self.assertFalse(valid)
def test_unconstrained_invalid_stream_boundary_error(self):
stream_boundary = StreamBoundary()
valid, _ = stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
with self.assertRaises(stream_boundaries.InvalidStreamBoundary):
stream_boundaries.validate_stream_boundary(
stream_boundary, 1, raise_when_invalid=True
)
valid
if __name__ == "__main__":

Wyświetl plik

@ -0,0 +1,3 @@
[settings]
profile = black
multi_line_output = 3

Wyświetl plik

@ -11,3 +11,93 @@ Install using `pip`:
```bash
pip install moonstream
```
## Usage
- Source environment variable with access token to Moonstream, you can create one on page https://moonstream.to/account/tokens/
```python
access_token = os.environ.get("MOONSTREAM_ACCESS_TOKEN")
```
- Create an object of Moonstream client and authorize
```python
mc = Moonstream()
mc.authorize(access_token)
```
## create_stream method
Return a stream of event for time range.
**From timestamp to None, from bottom to top**
When `end_time` is not set.
```python
for events in mc.create_stream(
start_time=1637834400, end_time=None, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
In this case we will be receiving events from bottom of history to recent time in next order:
```python
[1637836177, ..., 1637834440]
[1637837980, ..., 1637836226]
# Until we will get latest event,
# then we will be receiving empty lists
[]
[]
# Until new events will be available
[1637839306, 1637839306, 1637839306, 1637839306]
[]
# Continuing...
```
**From timestamp to timestamp, from top to bottom**
When `start_time` is greater then `end_time`.
```python
for events in mc.create_stream(
start_time=1637839281, end_time=1637830890, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
Stream of event packs will be generating from recent timestamp to older and inner list of transactions for each pack will be in most recent to older event timestamp range:
```python
[1637839280, ..., 1637838094]
[1637838086, ..., 1637836340]
...
[1637834488, ..., 1637832699]
[1637832676, ..., 1637830903]
```
**From timestamp to timestamp, from bottom to top**
When `start_time` is less then `end_time`.
```python
for events in mc.create_stream(
start_time=1637830890, end_time=1637839281, q="type:ethereum_blockchain"
):
event_timestamp_list = [e["event_timestamp"] for e in events["events"]]
print(event_timestamp_list)
```
You start receiving list of older events from bottom of history to newest:
```python
[1637832676, ..., 1637830903]
[1637834488, ..., 1637832699]
...
[1637838086, ..., 1637836340]
[1637839280, ..., 1637838094]
```

Wyświetl plik

@ -1,20 +1,19 @@
from dataclasses import dataclass, field
import logging
import os
from typing import Any, Dict, List, Optional
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, List, Optional, Tuple
import requests
from .version import MOONSTREAM_CLIENT_VERSION
logger = logging.getLogger(__name__)
log_level = logging.INFO
if os.environ.get("DEBUG", "").lower() in ["true", "1"]:
log_level = logging.DEBUG
logger.setLevel(log_level)
# Keep this synchronized with the version in setup.py
CLIENT_VERSION = "0.0.2"
ENDPOINT_PING = "/ping"
ENDPOINT_VERSION = "/version"
ENDPOINT_NOW = "/now"
@ -100,7 +99,9 @@ class Moonstream:
self.timeout = timeout
self._session = requests.Session()
self._session.headers.update(
{"User-Agent": f"Moonstream Python client (version {CLIENT_VERSION})"}
{
"User-Agent": f"Moonstream Python client (version {MOONSTREAM_CLIENT_VERSION})"
}
)
def ping(self) -> Dict[str, Any]:
@ -388,6 +389,96 @@ class Moonstream:
r.raise_for_status()
return r.json()
def create_stream(
self,
start_time: int,
end_time: Optional[int] = None,
q: str = "",
) -> Generator[Dict[str, Any], None, None]:
"""
Return a stream of event. Event packs will be generated with 1 hour time range.
Arguments:
- start_time - One of time border.
- end_time - Time until the end of stream, if set to None stream will be going forward endlessly.
- q - Optional query to filter over your available subscriptions and subscription types.
Returns: A dictionary stream representing the results of your query.
"""
# TODO(kompotkot): Add tests
shift_two_hours = 2 * 60 * 60 # 2 hours
shift_half_hour = 1 * 30 * 30 # 30 min
def fetch_events(
modified_start_time: int, modified_end_time: int
) -> Generator[Tuple[Dict[str, Any], bool], None, None]:
# If it is going from top to bottom in history,
# then time_range will be reversed
reversed_time = False
if modified_start_time > modified_end_time:
reversed_time = True
max_boundary = max(modified_start_time, modified_end_time)
min_boundary = min(modified_start_time, modified_end_time)
time_range_list = []
# 300, 450 with shift 100 => [{"start_time": 300, "end_time": 399}, {"start_time": 400, "end_time": 450}]
if max_boundary - min_boundary > shift_half_hour:
for i in range(min_boundary, max_boundary, shift_half_hour):
end_i = (
i + shift_half_hour - 1
if i + shift_half_hour <= max_boundary
else max_boundary
)
time_range_list.append({"start_time": i, "end_time": end_i})
else:
time_range_list.append(
{"start_time": min_boundary, "end_time": max_boundary}
)
if reversed_time:
time_range_list.reverse()
for time_range in time_range_list:
r_json = self.events(
start_time=time_range["start_time"],
end_time=time_range["end_time"],
include_start=True,
include_end=True,
q=q,
)
yield r_json, reversed_time
time_range_list = time_range_list[:]
if end_time is None:
float_start_time = start_time
while True:
end_time = int(self.server_time())
# If time range is greater then 2 hours,
# shift float_start time close to end_time to prevent stream block
if end_time - float_start_time > shift_two_hours:
float_start_time = shift_two_hours
for r_json, reversed_time in fetch_events(float_start_time, end_time):
yield r_json
events = r_json.get("events", [])
if len(events) > 0:
# Updating float_start_time after first iteration to last event time
if reversed_time:
float_start_time = events[-1].get("event_timestamp") - 1
else:
float_start_time = events[0].get("event_timestamp") + 1
else:
# If there are no events in response, wait
# until new will be added
time.sleep(5)
else:
for r_json, reversed_time in fetch_events(start_time, end_time):
yield r_json
def client_from_env() -> Moonstream:
"""

Wyświetl plik

@ -0,0 +1 @@
MOONSTREAM_CLIENT_VERSION = "0.0.3"

Wyświetl plik

@ -1,12 +1,14 @@
from setuptools import find_packages, setup
from moonstream.version import MOONSTREAM_CLIENT_VERSION
long_description = ""
with open("README.md") as ifp:
long_description = ifp.read()
setup(
name="moonstream",
version="0.0.2",
version=MOONSTREAM_CLIENT_VERSION,
packages=find_packages(),
package_data={"moonstream": ["py.typed"]},
install_requires=["requests", "dataclasses; python_version=='3.6'"],
@ -14,6 +16,7 @@ setup(
"dev": [
"black",
"mypy",
"isort",
"wheel",
"types-requests",
"types-dataclasses",

Wyświetl plik

@ -44,6 +44,7 @@ POLYGON_MISSING_SERVICE_FILE="polygon-missing.service"
POLYGON_MISSING_TIMER_FILE="polygon-missing.timer"
POLYGON_STATISTICS_SERVICE_FILE="polygon-statistics.service"
POLYGON_STATISTICS_TIMER_FILE="polygon-statistics.timer"
POLYGON_TXPOOL_SERVICE_FILE="polygon-txpool.service"
set -eu
@ -52,8 +53,8 @@ echo
echo
echo -e "${PREFIX_INFO} Building executable Ethereum transaction pool crawler script with Go"
EXEC_DIR=$(pwd)
cd "${APP_CRAWLERS_DIR}/ethtxpool"
HOME=/root /usr/local/go/bin/go build -o "${APP_CRAWLERS_DIR}/ethtxpool/ethtxpool" "${APP_CRAWLERS_DIR}/ethtxpool/main.go"
cd "${APP_CRAWLERS_DIR}/txpool"
HOME=/root /usr/local/go/bin/go build -o "${APP_CRAWLERS_DIR}/txpool/txpool" "${APP_CRAWLERS_DIR}/txpool/main.go"
cd "${EXEC_DIR}"
echo
@ -141,3 +142,11 @@ cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_SERVICE_FILE}" "/etc/systemd/system/${POL
cp "${SCRIPT_DIR}/${POLYGON_STATISTICS_TIMER_FILE}" "/etc/systemd/system/${POLYGON_STATISTICS_TIMER_FILE}"
systemctl daemon-reload
systemctl restart "${POLYGON_STATISTICS_TIMER_FILE}"
# echo
# echo
# echo -e "${PREFIX_INFO} Replacing existing Polygon transaction pool crawler service definition with ${POLYGON_TXPOOL_SERVICE_FILE}"
# chmod 644 "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}"
# cp "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_TXPOOL_SERVICE_FILE}"
# systemctl daemon-reload
# systemctl restart "${POLYGON_TXPOOL_SERVICE_FILE}"

Wyświetl plik

@ -5,9 +5,9 @@ After=network.target
[Service]
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/ethtxpool
WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream/crawlers/ethtxpool/ethtxpool
ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain ethereum
SyslogIdentifier=ethereum-txpool
[Install]

Wyświetl plik

@ -0,0 +1,14 @@
[Unit]
Description=Polygon txpool crawler
After=network.target
[Service]
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/txpool
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
ExecStart=/home/ubuntu/moonstream/crawlers/txpool/txpool -blockchain polygon
SyslogIdentifier=polygon-txpool
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -1,5 +0,0 @@
export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1"
export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545"
export ETHTXPOOL_HUMBUG_CLIENT_ID="<client id for the crawling machine>"
export ETHTXPOOL_HUMBUG_TOKEN="<Generate an integration and a Humbug token from https://bugout.dev/account/teams>"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout Humbug token for crash reports>"

Wyświetl plik

@ -9,11 +9,12 @@ import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResources
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, and_, func, text
from sqlalchemy import Column, and_, func, text, distinct
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.operators import in_op
@ -29,6 +30,7 @@ from ..settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
CRAWLER_LABEL,
)
from ..reporter import reporter
from ..settings import bugout_client as bc
from web3 import Web3
@ -49,6 +51,8 @@ blockchain_by_subscription_id = {
class TimeScale(Enum):
# TODO(Andrey) Unlock when we be sure about perfomanse of agregation on transactions table.
# Right now it can be hungs
# year = "year"
month = "month"
week = "week"
@ -69,6 +73,9 @@ timescales_delta: Dict[str, Dict[str, timedelta]] = {
"day": {"timedelta": timedelta(hours=24)},
}
abi_type_to_dashboards_type = {"function": "methods", "event": "events"}
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
@ -78,11 +85,11 @@ def push_statistics(
subscription: Any,
timescale: str,
bucket: str,
hash: str,
dashboard_id: UUID,
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{hash}/v1/{timescale}.json'
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
s3 = boto3.client("s3")
s3.put_object(
@ -336,8 +343,7 @@ def generate_data(
label_counts_subquery = (
label_counts.group_by(
text("timeseries_points"),
label_model.label_data["name"].astext,
text("timeseries_points"), label_model.label_data["name"].astext
)
.order_by(text("timeseries_points desc"))
.subquery(name="label_counts")
@ -405,10 +411,97 @@ def get_unique_address(
)
def generate_list_of_names(
type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any
):
"""
Generate list of names for select from database by name field
"""
if read_abi:
names = [item["name"] for item in abi_json if item["type"] == type]
else:
names = [
item["name"]
for item in subscription_filters[abi_type_to_dashboards_type[type]]
]
return names
def process_external(
abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType
):
"""
Request all required external data
TODO:(Andrey) Check posibility do it via AsyncHttpProvider(not supported for some of middlewares).
"""
extention_data = []
external_calls = []
for external_call in abi_external_calls:
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
]
external_calls.append(
{
"display_name": external_call["display_name"],
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
print(f"Error processing external call: {e}")
web3_client = connect(blockchain)
for extcall in external_calls:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
extention_data.append(
{"display_name": extcall["display_name"], "value": response}
)
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
return extention_data
def get_count(
name: str,
type: str,
db_session: Session,
select_expression: Any,
blockchain_type: AvailableBlockchainType,
address: str,
):
@ -418,7 +511,7 @@ def get_count(
label_model = get_label_model(blockchain_type)
return (
db_session.query(label_model)
db_session.query(select_expression)
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label_data["type"].astext == type)
@ -435,222 +528,223 @@ def stats_generate_handler(args: argparse.Namespace):
with yield_db_session_ctx() as db_session:
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
# ethereum_blockchain
start_time = time.time()
blockchain_type = AvailableBlockchainType(args.blockchain)
# polygon_blockchain
dashboard_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=10,
)
print(f"Amount of dashboards: {len(dashboard_resources.resources)}")
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={
"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
"abi": "true",
"subscription_type_id": subscription_id_by_blockchain[args.blockchain],
},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
print(
f"Amount of blockchain subscriptions: {len(blockchain_subscriptions.resources)}"
)
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
s3_client = boto3.client("s3")
# Already processed
already_processed = []
for dashboard in dashboard_resources.resources:
for subscription in required_subscriptions.resources:
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
for dashboard_subscription_filters in dashboard.resource_data[
"dashboard_subscriptions"
]:
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
external_calls = []
for external_call in abi_external_calls:
try:
func_input_abi = []
input_args = []
for func_input in external_call["inputs"]:
func_input_abi.append(
{"name": func_input["name"], "type": func_input["type"]}
)
input_args.append(
cast_to_python_type(func_input["type"])(func_input["value"])
)
subscription_id = dashboard_subscription_filters["subscription_id"]
func_abi = [
{
"name": external_call["name"],
"inputs": func_input_abi,
"outputs": external_call["outputs"],
"type": "function",
"stateMutability": "view",
}
if subscription_id not in subscription_by_id:
# Meen it's are different blockchain type
continue
s3_data_object = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data[
"address"
]
external_calls.append(
{
"display_name": external_call["display_name"],
"address": Web3.toChecksumAddress(external_call["address"]),
"name": external_call["name"],
"abi": func_abi,
"input_args": input_args,
}
)
except Exception as e:
print(f"Error processing external call: {e}")
generic = dashboard_subscription_filters["generic"]
web3_client = connect(blockchain_type)
# {
# "type": "external_call"
# "display_name": "Total weth earned"
# "address": "0xdf2811b6432cae65212528f0a7186b71adaec03a",
# "name": "balanceOf",
# "inputs": [
# {
# "name": "owner",
# "type": "address"
# "value": "0xA993c4759B731f650dfA011765a6aedaC91a4a88"
# }
# ],
# "outputs": [
# {
# "internalType": "uint256",
# "name": "",
# "type": "uint256"
# }
# }
if not subscription_by_id[subscription_id].resource_data["abi"]:
extention_data = []
for extcall in external_calls:
try:
contract = web3_client.eth.contract(
address=extcall["address"], abi=extcall["abi"]
)
response = contract.functions[extcall["name"]](
*extcall["input_args"]
).call()
methods = []
events = []
else:
bucket = subscription_by_id[subscription_id].resource_data[
"bucket"
]
key = subscription_by_id[subscription_id].resource_data[
"s3_path"
]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{"display_name": extcall["display_name"], "value": response}
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
except Exception as e:
print(f"Failed to call {extcall['name']} error: {e}")
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
if "HatchStartedEvent" in events:
abi_functions_names = [item["name"] for item in abi_functions]
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
),
}
)
abi_events_names = [item["name"] for item in abi_events]
if "HatchFinishedEvent" in events:
if "HatchStartedEvent" in abi_events_names:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data[
"args"
]["tokenId"]
),
blockchain_type=blockchain_type,
address=address,
),
}
)
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
for timescale in [timescale.value for timescale in TimeScale]:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
s3_data_object["web3_metric"] = extention_data
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
timescale=timescale,
functions=methods,
start=start_date,
metric_type="tx_call",
)
if "HatchFinishedEvent" in abi_events_names:
s3_data_object["functions"] = functions_calls_data
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
timescale=timescale,
functions=events,
start=start_date,
metric_type="event",
)
for timescale in [timescale.value for timescale in TimeScale]:
s3_data_object["events"] = events_data
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
s3_data_object["generic"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=generic,
start=start_date,
)
print(f"Timescale: {timescale}")
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard.id,
)
except Exception as err:
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard.id}",
],
)
print(err)
s3_data_object["web3_metric"] = extention_data
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_functions_names,
start=start_date,
metric_type="tx_call",
)
s3_data_object["functions"] = functions_calls_data
# generate data
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_events_names,
start=start_date,
metric_type="event",
)
s3_data_object["events"] = events_data
s3_data_object["generic"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=abi_events_names,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
timescale=timescale,
bucket=bucket,
hash=hash,
)
already_processed.append(f"{address}/{hash}")
reporter.custom_report(
title=f"Dashboard stats generated.",
content=f"Generate statistics for {args.blockchain}. \n Generation time: {time.time() - start_time}.",
tags=["dashboard", "statistics", f"blockchain:{args.blockchain}"],
)
def main() -> None:

Wyświetl plik

@ -0,0 +1,40 @@
package cmd
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type Transaction struct {
Type hexutil.Uint64 `json:"type"`
// Common transaction fields:
Nonce *hexutil.Uint64 `json:"nonce"`
GasPrice *hexutil.Big `json:"gasPrice"`
MaxPriorityFeePerGas *hexutil.Big `json:"maxPriorityFeePerGas"`
MaxFeePerGas *hexutil.Big `json:"maxFeePerGas"`
Gas *hexutil.Uint64 `json:"gas"`
Value *hexutil.Big `json:"value"`
Data *hexutil.Bytes `json:"input"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
To *common.Address `json:"to"`
// Access list transaction fields:
ChainID *hexutil.Big `json:"chainId,omitempty"`
// AccessList *AccessList `json:"accessList,omitempty"`
// Only used for encoding:
Hash common.Hash `json:"hash"`
}
type PendingTransaction struct {
From string `json:"from"`
Nonce uint64 `json:"nonce"`
Transaction *Transaction `json:"transaction"`
}
type PendingTransactions struct {
Transactions PendingTransaction `json:"transactions"`
}

Wyświetl plik

@ -2,22 +2,23 @@
Ethereum blockchain transaction pool crawler.
Execute:
go run main.go -geth http://127.0.0.1:8545
go run main.go -blockchain ethereum -interval 1
*/
package main
package cmd
import (
"encoding/json"
"flag"
"fmt"
"math/big"
"log"
"os"
"strings"
"time"
settings "github.com/bugout-dev/moonstream/crawlers/txpool/configs"
humbug "github.com/bugout-dev/humbug/go/pkg"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/google/uuid"
)
@ -29,40 +30,6 @@ func humbugClient(sessionID string, clientID string, humbugToken string) (*humbu
return reporter, err
}
type Transaction struct {
Type hexutil.Uint64 `json:"type"`
// Common transaction fields:
Nonce *hexutil.Uint64 `json:"nonce"`
GasPrice *hexutil.Big `json:"gasPrice"`
MaxPriorityFeePerGas *hexutil.Big `json:"maxPriorityFeePerGas"`
MaxFeePerGas *hexutil.Big `json:"maxFeePerGas"`
Gas *hexutil.Uint64 `json:"gas"`
Value *hexutil.Big `json:"value"`
Data *hexutil.Bytes `json:"input"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
To *common.Address `json:"to"`
// Access list transaction fields:
ChainID *hexutil.Big `json:"chainId,omitempty"`
// AccessList *AccessList `json:"accessList,omitempty"`
// Only used for encoding:
Hash common.Hash `json:"hash"`
}
type PendingTransaction struct {
From string `json:"from"`
Nonce uint64 `json:"nonce"`
Transaction *Transaction `json:"transaction"`
}
type PendingTransactions struct {
Transactions PendingTransaction `json:"transactions"`
}
// Split list of reports on nested lists
func generateChunks(xs []humbug.Report, chunkSize int) [][]humbug.Report {
if len(xs) == 0 {
@ -83,8 +50,7 @@ func generateChunks(xs []humbug.Report, chunkSize int) [][]humbug.Report {
}
// Fetch list of transactions form Ethereum TxPool
func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.HumbugReporter) {
initPoll := true
func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.HumbugReporter, blockchain string) {
currentTransactions := make(map[common.Hash]bool)
// Structure of the map:
@ -92,17 +58,17 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
var result map[string]map[string]map[uint64]*Transaction
for {
fmt.Println("Checking pending transactions in node:")
log.Println("Checking pending transactions in node")
gethClient.Call(&result, "txpool_content")
pendingTransactions := result["pending"]
// Mark all transactions from previous iteration as false
cacheSize := 0
cacheSizeCounter := 0
for transactionHash := range currentTransactions {
currentTransactions[transactionHash] = false
cacheSize++
cacheSizeCounter++
}
fmt.Printf("\tSize of pending transactions cache at the beginning: %d\n", cacheSize)
log.Printf("Size of pending transactions cache at the beginning: %d\n", cacheSizeCounter)
reports := []humbug.Report{}
@ -112,6 +78,7 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
for nonce, transaction := range transactionsByNonce {
pendingTx := PendingTransaction{From: fromAddress, Nonce: nonce, Transaction: transaction}
// Check if transaction already exist in our currentTransactions list and pass this transaction
transactionHash := transaction.Hash
_, transactionProcessed := currentTransactions[transactionHash]
if !transactionProcessed {
@ -121,7 +88,7 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
continue
}
ReportTitle := "Ethereum: Pending transaction: " + transactionHash.String()
ReportTitle := fmt.Sprintf("%s: Pending transaction: ", strings.Title(blockchain)) + transactionHash.String()
ReportTags := []string{
"hash:" + transactionHash.String(),
"from_address:" + fromAddress,
@ -130,8 +97,8 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
fmt.Sprintf("max_priority_fee_per_gas:%d", pendingTx.Transaction.MaxPriorityFeePerGas.ToInt()),
fmt.Sprintf("max_fee_per_gas:%d", pendingTx.Transaction.MaxFeePerGas.ToInt()),
fmt.Sprintf("gas:%d", pendingTx.Transaction.Gas),
fmt.Sprintf("value:%d", new(big.Float).Quo(new(big.Float).SetInt(transaction.Value.ToInt()), big.NewFloat(params.Ether))),
"crawl_type:ethereum_txpool",
fmt.Sprintf("value:%d", transaction.Value.ToInt()),
fmt.Sprintf("crawl_type:%s_txpool", blockchain),
}
report := humbug.Report{
Title: ReportTitle,
@ -145,10 +112,12 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
}
}
if !initPoll {
// TODO(kompotkot): Passing txs is wrong solution, but for end user
// it is similar like this txs even not passed through this node.
if len(reports) < 10000 {
reportChunks := generateChunks(reports, 500)
for _, chunk := range reportChunks {
fmt.Printf("\tPublishing chunk with: %d/%d reports\n", len(chunk), addedTransactionsCounter)
log.Printf("Published chunk with: %d/%d reports\n", len(chunk), addedTransactionsCounter)
reporter.PublishBulk(chunk)
time.Sleep(time.Duration(interval) * time.Second)
}
@ -163,30 +132,36 @@ func PollTxpoolContent(gethClient *rpc.Client, interval int, reporter *humbug.Hu
droppedTransactionsCounter++
}
}
fmt.Printf("\tDropped transactions: %d\n", droppedTransactionsCounter)
log.Printf("Dropped transactions: %d\n", droppedTransactionsCounter)
fmt.Printf("Sleeping for %d seconds\n", interval)
log.Printf("Sleeping for %d seconds\n", interval)
time.Sleep(time.Duration(interval) * time.Second)
} else {
fmt.Printf("Initial start of crawler, too many transactions: %d, passing them...\n", addedTransactionsCounter)
initPoll = false
log.Printf("Too many transactions: %d, passing them...\n", addedTransactionsCounter)
}
}
}
func main() {
func InitTxPool() {
var blockchain string
var intervalSeconds int
flag.StringVar(&blockchain, "blockchain", "", "Blockchain to crawl")
flag.IntVar(&intervalSeconds, "interval", 1, "Number of seconds to wait between RPC calls to query the transaction pool (default: 1)")
flag.Parse()
var MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_ADDR")
var MOONSTREAM_NODE_ETHEREUM_IPC_PORT = os.Getenv("MOONSTREAM_NODE_ETHEREUM_IPC_PORT")
var MOONSTREAM_IPC_PATH = fmt.Sprintf("http://%s:%s", MOONSTREAM_NODE_ETHEREUM_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_IPC_PORT)
switch blockchain {
case "ethereum", "polygon":
log.Printf("%s blockchain\n", strings.Title(blockchain))
default:
panic(fmt.Sprintln("Invalid blockchain provided"))
}
MOONSTREAM_IPC_PATH := settings.GetIpcPath(blockchain)
sessionID := uuid.New().String()
// Humbug crash client to collect errors
crashReporter, err := humbugClient(sessionID, "moonstream-crawlers", os.Getenv("HUMBUG_REPORTER_CRAWLERS_TOKEN"))
crashReporter, err := humbugClient(sessionID, "moonstream-crawlers", settings.HUMBUG_REPORTER_CRAWLERS_TOKEN)
if err != nil {
panic(fmt.Sprintf("Invalid Humbug Crash configuration: %s", err.Error()))
}
@ -208,10 +183,10 @@ func main() {
defer gethClient.Close()
// Humbug client to be able write data in Bugout journal
reporter, err := humbugClient(sessionID, os.Getenv("ETHTXPOOL_HUMBUG_CLIENT_ID"), os.Getenv("ETHTXPOOL_HUMBUG_TOKEN"))
reporter, err := humbugClient(sessionID, settings.HUMBUG_TXPOOL_CLIENT_ID, settings.HUMBUG_TXPOOL_TOKEN)
if err != nil {
panic(fmt.Sprintf("Invalid Humbug configuration: %s", err.Error()))
}
PollTxpoolContent(gethClient, intervalSeconds, reporter)
PollTxpoolContent(gethClient, intervalSeconds, reporter, blockchain)
}

Wyświetl plik

@ -0,0 +1,20 @@
package settings
import (
"fmt"
"os"
"strings"
)
// Internal crash journal to collect errors
var HUMBUG_REPORTER_CRAWLERS_TOKEN = os.Getenv("HUMBUG_REPORTER_CRAWLERS_TOKEN")
var HUMBUG_TXPOOL_CLIENT_ID = os.Getenv("HUMBUG_TXPOOL_CLIENT_ID")
var HUMBUG_TXPOOL_TOKEN = os.Getenv("HUMBUG_TXPOOL_TOKEN")
// Geth connection URL
func GetIpcPath(blockchain string) string {
MOONSTREAM_NODE_IPC_ADDR := os.Getenv(fmt.Sprintf("MOONSTREAM_NODE_%s_IPC_ADDR", strings.ToUpper(blockchain)))
MOONSTREAM_NODE_IPC_PORT := os.Getenv(fmt.Sprintf("MOONSTREAM_NODE_%s_IPC_PORT", strings.ToUpper(blockchain)))
return fmt.Sprintf("http://%s:%s", MOONSTREAM_NODE_IPC_ADDR, MOONSTREAM_NODE_IPC_PORT)
}

Wyświetl plik

@ -1,4 +1,4 @@
module github.com/bugout-dev/moonstream/crawlers/ethtxpool
module github.com/bugout-dev/moonstream/crawlers/txpool
go 1.16

Wyświetl plik

@ -0,0 +1,9 @@
package main
import (
"github.com/bugout-dev/moonstream/crawlers/txpool/cmd"
)
func main() {
cmd.InitTxPool()
}

Wyświetl plik

@ -0,0 +1,7 @@
export MOONSTREAM_NODE_ETHEREUM_IPC_ADDR="127.0.0.1"
export MOONSTREAM_NODE_ETHEREUM_IPC_PORT="8545"
export MOONSTREAM_NODE_POLYGON_IPC_ADDR="127.0.0.1"
export MOONSTREAM_NODE_POLYGON_IPC_PORT="8545"
export HUMBUG_TXPOOL_CLIENT_ID="<client id for the crawling machine>"
export HUMBUG_TXPOOL_TOKEN="<Generate an integration and a Humbug token from https://bugout.dev/account/teams>"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout Humbug token for crash reports>"

Wyświetl plik

@ -259,8 +259,7 @@ const Analytics = () => {
<SubscriptionReport
timeRange={timeRange}
url={s3PresignedURLs[timeRange]}
id={v4()}
type={v4()}
id={dashboardId}
refetchLinks={dashboardLinksCache.refetch}
/>
</Flex>

Wyświetl plik

@ -65,6 +65,30 @@ const NewDashboard = (props) => {
subscriptions.subscriptionsCache.data?.subscriptions
);
useEffect(() => {
newDashboardForm.subscriptions.forEach((element, idx) => {
const subscription =
subscriptions.subscriptionsCache.data?.subscriptions.find(
(subscription_item) =>
element.subscription_id === subscription_item.id
);
if (
element.subscription_id &&
subscription &&
newDashboardForm.subscriptions[idx].abi !== subscription?.abi
) {
const newestDashboardForm = { ...newDashboardForm };
newestDashboardForm.subscriptions[idx].abi = subscription.abi;
setNewDashboardForm(newestDashboardForm);
}
});
}, [
subscriptions.subscriptionsCache.data,
newDashboardForm,
setNewDashboardForm,
]);
useEffect(() => {
if (!subscriptions.subscriptionsCache.isLoading) {
const massaged = subscriptions.subscriptionsCache.data?.subscriptions.map(
@ -195,7 +219,6 @@ const NewDashboard = (props) => {
};
setNewDashboardForm(newState);
}}
// isOpen={showSuggestions}
itemToString={(item) => (item ? item.label : "")}
initialSelectedItem={subscibedItem ?? undefined}
>
@ -208,7 +231,6 @@ const NewDashboard = (props) => {
isOpen,
inputValue,
highlightedIndex,
// selectedItem,
getRootProps,
}) => {
const labelColor =
@ -217,7 +239,6 @@ const NewDashboard = (props) => {
return (
<Box pos="relative">
<Box
// style={comboboxStyles}
{...getRootProps(
{},
{ suppressRefError: true }
@ -252,9 +273,6 @@ const NewDashboard = (props) => {
placeholder="Subscription to use in dashboard"
isTruncated
fontSize="sm"
// defaultValue={
// subscibedItem?.label ?? "yoyoy"
// }
{...getInputProps({
defaultValue:
subscibedItem?.label ?? "iha",
@ -271,18 +289,6 @@ const NewDashboard = (props) => {
</InputRightAddon>
</InputGroup>
</Box>
{/* <Menu
isOpen={isOpen}
// style={menuStyles}
// position="absolute"
colorScheme="blue"
bgColor="gray.300"
inset="unset"
// spacing={2}
// p={2}
> */}
{isOpen ? (
<Stack
// display="flex"
@ -416,35 +422,6 @@ const NewDashboard = (props) => {
}}
</Downshift>
</>
// <AutoComplete
// openOnFocus
// creatable
// suggestWhenEmpty
// >
// <AutoCompleteInput variant="filled" />
// <AutoCompleteList>
// {pickerItems?.map((subscription, oid) => (
// <AutoCompleteItem
// key={`row-${idx}-option-${oid}`}
// value={subscription.address}
// textTransform="capitalize"
// align="center"
// >
// {/* <Avatar
// size="sm"
// // name={person.name}
// // src={person.image}
// /> */}
// <Text ml="4">{subscription.label}</Text>
// </AutoCompleteItem>
// ))}
// {/* <AutoCompleteCreatable>
// {({ value }) => (
// <span>New Subscription: {value}</span>
// )}
// </AutoCompleteCreatable> */}
// </AutoCompleteList>
// </AutoComplete>
)}
</Td>
<Td p={1} textAlign="center">
@ -460,7 +437,7 @@ const NewDashboard = (props) => {
onClick={() =>
overlay.toggleModal({
type: MODAL_TYPES.UPLOAD_ABI,
props: { id: subscibedItem.id },
props: { id: subscibedItem.subscription_id },
})
}
>
@ -620,14 +597,6 @@ const NewDashboard = (props) => {
</Button>
</Center>
</Box>
{/* <Box>
<FormLabel htmlFor="desc">ABI</FormLabel>
<Textarea
id="desc"
placeholder="ABI Upload element should be here instead"
/>
</Box> */}
</Stack>
</>
);

Wyświetl plik

@ -12,12 +12,12 @@ timeMap[HOUR_KEY] = "hour";
timeMap[DAY_KEY] = "day";
timeMap[WEEK_KEY] = "week";
const SubscriptionReport = ({ timeRange, url, id, type, refetchLinks }) => {
const SubscriptionReport = ({ timeRange, url, id, refetchLinks }) => {
const { data, isLoading } = usePresignedURL({
url: url,
isEnabled: true,
id: id,
type: type,
cacheType: timeRange,
requestNewURLCallback: refetchLinks,
});
const plotMinW = "250px";

Wyświetl plik

@ -33,7 +33,7 @@ export const deleteDashboard = (id) => {
console.log("delete:", id);
return http({
method: "DELETE",
url: `${API_URL}/dashboards/${id}/`,
url: `${API_URL}/dashboards/${id}`,
});
};