Optimized to work with both blockchains, added cli to setup

pull/391/head
kompotkot 2021-11-13 15:21:41 +00:00
rodzic 77dd49405d
commit bf6a94a17e
6 zmienionych plików z 216 dodań i 176 usunięć

Wyświetl plik

@ -5,8 +5,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
EthereumBlock,
EthereumLabel,
EthereumTransaction,
PolygonBlock,
PolygonLabel,
PolygonTransaction,
)
from psycopg2.errors import UniqueViolation # type: ignore
@ -78,6 +80,24 @@ def get_block_model(
return block_model
def get_label_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumLabel, PolygonLabel]]:
"""
Depends on provided blockchain type: Ethereum or Polygon,
set proper block label model: EthereumLabel or PolygonLabel.
"""
label_model: Type[Union[EthereumLabel, PolygonLabel]]
if blockchain_type == AvailableBlockchainType.ETHEREUM:
label_model = EthereumLabel
elif blockchain_type == AvailableBlockchainType.POLYGON:
label_model = PolygonLabel
else:
raise Exception("Unsupported blockchain type provided")
return label_model
def get_transaction_model(
blockchain_type: AvailableBlockchainType,
) -> Type[Union[EthereumTransaction, PolygonTransaction]]:

Wyświetl plik

@ -1,7 +1,13 @@
import os
from typing import cast
from bugout.app import Bugout
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
bc = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN")
# Geth connection address
@ -49,3 +55,5 @@ if MOONSTREAM_ADMIN_ACCESS_TOKEN == "":
MOONSTREAM_DATA_JOURNAL_ID = os.environ.get("MOONSTREAM_DATA_JOURNAL_ID", "")
if MOONSTREAM_DATA_JOURNAL_ID == "":
raise ValueError("MOONSTREAM_DATA_JOURNAL_ID env variable is not set")
AWS_S3_SMARTCONTRACTS_ABI_PREFIX = os.getenv("AWS_S3_SMARTCONTRACTS_ABI_PREFIX")

Wyświetl plik

@ -1,35 +1,34 @@
"""
Generates dashboard.
"""
import argparse
from datetime import timedelta, datetime
from enum import Enum
import hashlib
import logging
import json
import os
from typing import Any, Dict, List, Callable
import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
import boto3
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel, EthereumTransaction, EthereumBlock
from bugout.app import Bugout
import boto3 # type: ignore
from bugout.data import BugoutResources
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, Date, and_, func, text
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm import Session, Query
from sqlalchemy import func, text, and_, Date, Column
from ..blockchain import get_block_model, get_label_model, get_transaction_model
from ..data import AvailableBlockchainType
from ..settings import (
AWS_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
bc,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Bugout
lable_filters = {"Transfer": "nft_transfer"}
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
DATA_BUCKET = ""
class TimeScale(Enum):
year = "year"
@ -52,19 +51,9 @@ timescales_delta: Dict[str, Dict[str, timedelta]] = {
"day": {"timedelta": timedelta(hours=24)},
}
bc = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
MOONSTREAM_ADMIN_ACCESS_TOKEN = os.getenv("MOONSTREAM_ADMIN_ACCESS_TOKEN")
AWS_S3_SMARTCONTRACTS_ABI_PREFIX = os.getenv("AWS_S3_SMARTCONTRACTS_ABI_PREFIX")
def push_statistics(
statistics_data: Dict[str, Any],
@ -90,12 +79,18 @@ def push_statistics(
def generate_metrics(
db_session: Session, address: str, timescale: str, metrics: List[str], start: Any
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
metrics: List[str],
start: Any,
):
"""
Generage metrics
"""
block_model = get_block_model(blockchain_type)
transaction_model = get_transaction_model(blockchain_type)
start = start
end = datetime.utcnow()
@ -137,16 +132,16 @@ def generate_metrics(
db_session.query(
func.count(statistic_column).label("count"),
func.to_char(
func.to_timestamp(EthereumBlock.timestamp).cast(Date), time_format
func.to_timestamp(block_model.timestamp).cast(Date), time_format
).label("timeseries_points"),
)
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
block_model,
transaction_model.block_number == block_model.block_number,
)
.filter(identifying_column == address)
.filter(EthereumBlock.timestamp >= start_timestamp)
.filter(EthereumBlock.timestamp <= end_timestamp)
.filter(block_model.timestamp >= start_timestamp)
.filter(block_model.timestamp <= end_timestamp)
.group_by(text("timeseries_points"))
).subquery(name="metric_counts")
@ -184,8 +179,8 @@ def generate_metrics(
results["transactions_out"] = make_query(
db_session,
EthereumTransaction.from_address,
EthereumTransaction.hash,
transaction_model.from_address,
transaction_model.hash,
)
print("--- transactions_out %s seconds ---" % (time.time() - start_time))
@ -193,8 +188,8 @@ def generate_metrics(
start_time = time.time()
results["transactions_in"] = make_query(
db_session,
EthereumTransaction.to_address,
EthereumTransaction.hash,
transaction_model.to_address,
transaction_model.hash,
)
print("--- transactions_in %s seconds ---" % (time.time() - start_time))
@ -202,16 +197,16 @@ def generate_metrics(
start_time = time.time()
results["value_out"] = make_query(
db_session,
EthereumTransaction.from_address,
EthereumTransaction.value,
transaction_model.from_address,
transaction_model.value,
)
print("--- value_out %s seconds ---" % (time.time() - start_time))
start_time = time.time()
results["value_in"] = make_query(
db_session,
EthereumTransaction.to_address,
EthereumTransaction.value,
transaction_model.to_address,
transaction_model.value,
)
print("--- value_in %s seconds ---" % (time.time() - start_time))
@ -224,8 +219,14 @@ def generate_metrics(
def generate_data(
db_session: Session, address: str, timescale: str, functions: List[str], start: Any
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
timescale: str,
functions: List[str],
start: Any,
):
label_model = get_label_model(blockchain_type)
# create empty time series
@ -251,19 +252,19 @@ def generate_data(
# get distinct tags labels in that range
label_requested = (
db_session.query(EthereumLabel.label.label("label"))
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label.in_(functions))
db_session.query(label_model.label.label("label"))
.filter(label_model.address == address)
.filter(label_model.label.in_(functions))
.distinct()
)
if start is not None:
label_requested = label_requested.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start
func.to_timestamp(label_model.block_timestamp).cast(Date) > start
)
if end is not None:
label_requested = label_requested.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end
func.to_timestamp(label_model.block_timestamp).cast(Date) < end
)
label_requested = label_requested.subquery(name="label_requested")
@ -284,28 +285,28 @@ def generate_data(
label_counts = (
db_session.query(
func.to_char(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date), time_format
func.to_timestamp(label_model.block_timestamp).cast(Date), time_format
).label("timeseries_points"),
func.count(EthereumLabel.id).label("count"),
EthereumLabel.label.label("label"),
func.count(label_model.id).label("count"),
label_model.label.label("label"),
)
.filter(EthereumLabel.label.in_(functions))
.filter(EthereumLabel.address == address)
.filter(label_model.label.in_(functions))
.filter(label_model.address == address)
)
if start is not None:
label_counts = label_counts.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) > start
func.to_timestamp(label_model.block_timestamp).cast(Date) > start
)
if end is not None:
label_counts = label_counts.filter(
func.to_timestamp(EthereumLabel.block_timestamp).cast(Date) < end
func.to_timestamp(label_model.block_timestamp).cast(Date) < end
)
label_counts_subquery = (
label_counts.group_by(
text("timeseries_points"),
EthereumLabel.label,
label_model.label,
)
.order_by(text("timeseries_points desc"))
.subquery(name="label_counts")
@ -355,132 +356,142 @@ def generate_data(
return response_labels
def crawlers_start(db_session):
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# already proccessd
already_processed = []
for subscription in required_subscriptions.resources:
if (
subscription.resource_data["address"]
!= "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d"
):
continue
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
for timescale in [timescale.value for timescale in TimeScale]:
start_date = datetime.utcnow() - timescales_delta[timescale]["timedelta"]
print(f"Timescale: {timescale}")
abi_functions_names = [item["name"] for item in abi_functions]
functions_calls_data = generate_data(
db_session, address, timescale, abi_functions_names, start=start_date
)
s3_data_object["functions"] = functions_calls_data
# generate data
abi_events_names = [
item["name"]
if item["name"] not in lable_filters
else lable_filters[item["name"]]
for item in abi_events
]
events_data = generate_data(
db_session,
address,
timescale,
abi_events_names,
start=start_date,
)
s3_data_object["events"] = events_data
s3_data_object["metrics"] = generate_metrics(
db_session,
address,
timescale,
abi_events_names,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
timescale=timescale,
bucket=bucket,
hash=hash,
)
already_processed.append(f"{address}/{hash}")
time.sleep(10)
def stats_generate_handler(args: argparse.Namespace):
"""
Start crawler with generate.
"""
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_session_ctx() as db_session:
crawlers_start(db_session)
# read all subscriptions
required_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, "abi": "true"},
timeout=10,
)
print(f"Subscriptions for processing: {len(required_subscriptions.resources)}")
s3_client = boto3.client("s3")
# already proccessd
already_processed = []
for subscription in required_subscriptions.resources:
if (
subscription.resource_data["address"]
!= "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d"
):
continue
bucket = subscription.resource_data["bucket"]
key = subscription.resource_data["s3_path"]
address = subscription.resource_data["address"]
print(f"Expected bucket: s3://{bucket}/{key}")
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
abi_string = json.dumps(abi_json, sort_keys=True, indent=2)
hash = hashlib.md5(abi_string.encode("utf-8")).hexdigest()
if f"{address}/{hash}" in already_processed:
continue
s3_data_object = {}
abi_functions = [item for item in abi_json if item["type"] == "function"]
abi_events = [item for item in abi_json if item["type"] == "event"]
for timescale in [timescale.value for timescale in TimeScale]:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
abi_functions_names = [item["name"] for item in abi_functions]
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_functions_names,
start=start_date,
)
s3_data_object["functions"] = functions_calls_data
# generate data
abi_events_names = [
item["name"]
if item["name"] not in lable_filters
else lable_filters[item["name"]]
for item in abi_events
]
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=abi_events_names,
start=start_date,
)
s3_data_object["events"] = events_data
s3_data_object["metrics"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=abi_events_names,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription,
timescale=timescale,
bucket=bucket,
hash=hash,
)
already_processed.append(f"{address}/{hash}")
time.sleep(10)
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers(description="Dashboard worker commands")
# Statistics parser
parser_statistics = subcommands.add_parser(
"statistics", description="Drone statistics"
)
parser_statistics.set_defaults(func=lambda _: parser_statistics.print_help())
subcommands_statistics = parser_statistics.add_subparsers(
subcommands = parser.add_subparsers(
description="Drone dashboard statistics commands"
)
parser_statistics_generate = subcommands_statistics.add_parser(
# Statistics parser
parser_generate = subcommands.add_parser(
"generate", description="Generate statistics"
)
parser_generate.set_defaults(func=lambda _: parser_generate.print_help())
parser_generate.add_argument(
"--blockchain",
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser_generate.set_defaults(func=stats_generate_handler)
parser_statistics_generate.set_defaults(func=stats_generate_handler)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.1.0"
MOONCRAWL_VERSION = "0.1.1"

Wyświetl plik

@ -50,11 +50,12 @@ setup(
entry_points={
"console_scripts": [
"crawler=mooncrawl.crawler:main",
"esd=mooncrawl.esd:main",
"identity=mooncrawl.identity:main",
"etherscan=mooncrawl.etherscan:main",
"nft=mooncrawl.nft.cli:main",
"contractcrawler=mooncrawl.contract.cli:main",
"esd=mooncrawl.esd:main",
"etherscan=mooncrawl.etherscan:main",
"identity=mooncrawl.identity:main",
"nft=mooncrawl.nft.cli:main",
"statistics=mooncrawl.stats_worker.dashboard:main",
]
},
)