isort and init state of api tasks.

pull/516/head
Andrey Dolgolev 2021-12-16 15:26:04 +02:00
rodzic f5529eb8e6
commit 9783690f8b
22 zmienionych plików z 325 dodań i 114 usunięć

Wyświetl plik

@ -1,15 +1,16 @@
import argparse
import binascii
import sys
from typing import List, Optional, Union, Type, cast
from typing import List, Optional, Type, Union, cast
import pyevmasm
from moonstreamdb.db import yield_db_session
from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import text
from .data import EVMEventSignature, EVMFunctionSignature, ContractABI
from moonstreamdb.db import yield_db_session
from .data import ContractABI, EVMEventSignature, EVMFunctionSignature
def query_for_text_signatures(

Wyświetl plik

@ -1,38 +1,33 @@
import json
import logging
from typing import Optional, Dict, Any, Union
from enum import Enum
import uuid
from enum import Enum
from typing import Any, Dict, Optional, Union
import boto3 # type: ignore
from bugout.data import BugoutSearchResults
from bugout.data import BugoutResource, BugoutSearchResults
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import (
EthereumLabel,
)
from moonstreamdb.models import EthereumLabel
from sqlalchemy import text
from sqlalchemy.orm import Session
from web3 import Web3
from web3._utils.validation import validate_abi
from .middleware import MoonstreamHTTPException
from . import data
from .reporter import reporter
from .middleware import MoonstreamHTTPException
from .settings import ETHERSCAN_SMARTCONTRACTS_BUCKET
from bugout.data import BugoutResource
from .reporter import reporter
from .settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
ETHERSCAN_SMARTCONTRACTS_BUCKET,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from web3 import Web3
from .settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,16 +2,15 @@
Moonstream CLI
"""
import argparse
import logging
import json
import logging
import os
from posix import listdir
from typing import Optional
from sqlalchemy.orm import with_expression
from moonstreamdb.db import SessionLocal
from sqlalchemy.orm import with_expression
from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..web3_provider import yield_web3_provider

Wyświetl plik

@ -7,12 +7,9 @@ from typing import Dict, List, Optional, Union
from bugout.data import BugoutResources
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from .. import reporter
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_ADMIN_ACCESS_TOKEN
from ..settings import bugout_client as bc
def migrate_subscriptions(

Wyświetl plik

@ -8,16 +8,15 @@ from typing import Dict
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from . import actions
from . import data
from . import actions, data
from .middleware import BroodAuthMiddleware, MoonstreamHTTPException
from .routes.address_info import router as addressinfo_router
from .routes.dashboards import router as dashboards_router
from .routes.streams import router as streams_router
from .routes.subscriptions import router as subscriptions_router
from .routes.txinfo import router as txinfo_router
from .routes.users import router as users_router
from .routes.whales import router as whales_router
from .routes.dashboards import router as dashboards_router
from .middleware import BroodAuthMiddleware, MoonstreamHTTPException
from .settings import DOCS_TARGET_PATH, ORIGINS
from .version import MOONSTREAMAPI_VERSION

Wyświetl plik

@ -7,7 +7,8 @@ from fastapi import HTTPException, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from .reporter import reporter
from .settings import MOONSTREAM_APPLICATION_ID, bugout_client as bc
from .settings import MOONSTREAM_APPLICATION_ID
from .settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -32,9 +32,9 @@ from bugout.app import Bugout
from bugout.data import BugoutResource
from sqlalchemy.orm import Session
from . import bugout, transactions, moonworm_provider
from .. import data
from ..stream_queries import StreamQuery
from . import bugout, moonworm_provider, transactions
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,9 +1,9 @@
"""
Event providers powered by Bugout journals.
"""
from datetime import datetime
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from bugout.app import Bugout
@ -14,9 +14,8 @@ from dateutil.tz import UTC
from sqlalchemy.orm import Session
from .. import data
from ..stream_queries import StreamQuery
from ..settings import HUMBUG_TXPOOL_CLIENT_ID
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,24 +1,18 @@
from dataclasses import dataclass, field
import logging
from typing import cast, Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout
from bugout.data import BugoutResource
from moonstreamdb.blockchain import (
get_label_model,
AvailableBlockchainType,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query, query_expression
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import Query, Session, query_expression
from sqlalchemy.sql.expression import label
from .. import data
from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -1,25 +1,22 @@
from dataclasses import dataclass, field
import logging
from typing import cast, Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, cast
from bugout.app import Bugout
from bugout.data import BugoutResource
from moonstreamdb.blockchain import (
get_label_model,
get_block_model,
get_transaction_model,
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session, Query
from sqlalchemy import and_, or_, text
from sqlalchemy.orm import Query, Session
from .. import data
from ..stream_boundaries import validate_stream_boundary
from ..stream_queries import StreamQuery
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)

Wyświetl plik

@ -2,12 +2,12 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from moonstreamdb.db import yield_db_session
from sqlalchemy.orm import Session
from web3 import Web3
from .. import actions
from .. import data
from moonstreamdb.db import yield_db_session
from .. import actions, data
from ..middleware import MoonstreamHTTPException
from ..web3_provider import yield_web3_provider

Wyświetl plik

@ -1,25 +1,26 @@
import json
import logging
from os import read
import json
from typing import Any, List, Optional, Dict
from typing import Any, Dict, List, Optional
from uuid import UUID
import boto3 # type: ignore
import requests
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Request, Query, Body
from fastapi import APIRouter, Body, Query, Request
from .. import actions
from .. import data
from .. import actions, data
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_INTERNAL,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
@ -347,7 +348,7 @@ async def get_dashboard_data_links_handler(
request: Request, dashboard_id: str
) -> Dict[UUID, Any]:
"""
Update dashboards mainly fully overwrite name and subscription metadata
Get s3 presign urls for dshaboard grafics
"""
token = request.state.token
@ -431,3 +432,25 @@ async def get_dashboard_data_links_handler(
)
return stats
@router.get("/{dashboard_id}/stats_update", tags=["dashboards"])
async def update_dashbord_data_handler(
request: Request, dashboard_id: str, timescale: str
) -> Dict[str, Any]:
"""
Return journal statistics
journal.read permission required.
"""
token = request.state.token
requests.get(
f"{MOONSTREAM_CRAWLERS_INTERNAL}/jobs/stats_update",
params={
"dashboard_id": dashboard_id,
"timescale": timescale,
"token": token,
},
)
return {"status": "task send"}

Wyświetl plik

@ -5,11 +5,12 @@ import logging
from typing import Any, Dict, List, Optional
from bugout.data import BugoutResource
from fastapi import APIRouter, Request, Query, Depends
from moonstreamdb import db
from fastapi import APIRouter, Depends, Query, Request
from sqlalchemy.orm import Session
from .. import data
from moonstreamdb import db
from .. import data, stream_queries
from ..middleware import MoonstreamHTTPException
from ..providers import (
ReceivingEventsException,
@ -20,12 +21,11 @@ from ..providers import (
previous_event,
)
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from .. import stream_queries
from ..settings import bugout_client as bc
from .subscriptions import BUGOUT_RESOURCE_TYPE_SUBSCRIPTION
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,31 +2,27 @@
The Moonstream subscriptions HTTP API
"""
import hashlib
import logging
import json
from typing import List, Optional, Dict, Any
import logging
from typing import Any, Dict, List, Optional
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Depends, Request, Form
from fastapi import APIRouter, Depends, Form, Request
from web3 import Web3
from ..actions import (
validate_abi_json,
upload_abi_to_s3,
)
from ..admin import subscription_types
from .. import data
from ..actions import upload_abi_to_s3, validate_abi_json
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from ..settings import bugout_client as bc
from ..web3_provider import yield_web3_provider
logger = logging.getLogger(__name__)

Wyświetl plik

@ -9,12 +9,12 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends
from moonstreamdb.db import yield_db_session
from sqlalchemy.orm import Session
from moonstreamdb.db import yield_db_session
from .. import actions, data
from ..abi_decoder import decode_abi
from .. import actions
from .. import data
logger = logging.getLogger(__name__)

Wyświetl plik

@ -2,27 +2,18 @@
The Moonstream users HTTP API
"""
import logging
from typing import Any, Dict, Optional
import uuid
from typing import Any, Dict, Optional
from bugout.data import BugoutToken, BugoutUser, BugoutResource, BugoutUserTokens
from bugout.data import BugoutResource, BugoutToken, BugoutUser, BugoutUserTokens
from bugout.exceptions import BugoutResponseException
from fastapi import (
APIRouter,
Body,
Form,
Request,
)
from fastapi import APIRouter, Body, Form, Request
from .. import data
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from ..actions import create_onboarding_resource
from ..middleware import MoonstreamHTTPException
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -7,15 +7,16 @@ import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from moonstreamdb import db
from sqlalchemy.orm import Session
from moonstreamdb import db
from .. import data
from ..providers.bugout import ethereum_whalewatch_provider
from ..settings import (
bugout_client,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
bugout_client,
)
from ..stream_queries import StreamQuery

Wyświetl plik

@ -84,3 +84,8 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/")
)
MOONSTREAM_CRAWLERS_INTERNAL = os.environ.get("MOONSTREAM_CRAWLERS_INTERNAL")
if MOONSTREAM_CRAWLERS_INTERNAL is None:
raise ValueError("MOONSTREAM_CRAWLERS_INTERNAL environment variable must be set")
MOONSTREAM_CRAWLERS_INTERNAL = MOONSTREAM_CRAWLERS_INTERNAL.rstrip("/")

Wyświetl plik

@ -1,9 +1,9 @@
"""
Stream queries - data structure, and parser.
"""
from dataclasses import dataclass, field
import logging
from typing import cast, List, Tuple
from dataclasses import dataclass, field
from typing import List, Tuple, cast
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

Wyświetl plik

@ -1,10 +1,9 @@
import unittest
from .data import DashboardMeta
from .actions import dashboards_abi_validation
from .data import DashboardMeta
from .middleware import MoonstreamHTTPException
abi_example = [
{
"inputs": [

Wyświetl plik

@ -5,13 +5,14 @@ import logging
import time
from typing import Dict
from fastapi import FastAPI
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from . import data
from .middleware import MoonstreamHTTPException
from .settings import DOCS_TARGET_PATH, ORIGINS
from .version import MOONCRAWL_VERSION
from .stats_worker import dashboard
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -66,14 +67,21 @@ async def now_handler() -> data.NowResponse:
@app.get("/jobs/stats_update", tags=["jobs"])
async def status_handler():
async def status_handler(
dashboard_id: str, timescale: str, token: str, background_tasks: BackgroundTasks
):
"""
Find latest crawlers records with creation timestamp:
- ethereum_txpool
- ethereum_trending
Update dashboard endpoint create are tasks for update.
"""
try:
pass
background_tasks.add_task(
dashboard.stats_generate_api_task,
token=token,
timescale=timescale,
dashboard_id=dashboard_id,
)
except Exception as e:
logger.error(f"Unhandled status exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)

Wyświetl plik

@ -574,7 +574,6 @@ def stats_generate_handler(args: argparse.Namespace):
# ethereum_blockchain
start_time = time.time()
blockchain_type = AvailableBlockchainType(args.blockchain)
# polygon_blockchain
dashboard_resources: BugoutResources = bc.list_resources(
@ -800,6 +799,213 @@ def stats_generate_handler(args: argparse.Namespace):
)
def stats_generate_api_task(token: UUID, timescale: str, dashboard_id: str):
"""
Start crawler with generate.
"""
with yield_db_session_ctx() as db_session:
dashboard: BugoutResource = bc.get_resource(
token=token,
resource_id=dashboard_id,
timeout=10,
)
# get all user subscriptions
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION},
timeout=10,
)
# Create subscriptions dict for get subscriptions by id.
subscription_by_id = {
str(blockchain_subscription.id): blockchain_subscription
for blockchain_subscription in blockchain_subscriptions.resources
}
print(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
for dashboard_subscription_filters in dashboard.resource_data[
"dashboard_subscriptions"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
blockchain_type = AvailableBlockchainType(
subscription_by_id[subscription_id].resource_data[
"subscription_type_id"
]
)
s3_data_object: Dict[str, Any] = {}
extention_data = []
address = subscription_by_id[subscription_id].resource_data["address"]
generic = dashboard_subscription_filters["generic"]
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
Bucket=bucket,
Key=key,
)
abi_json = json.loads(abi["Body"].read())
methods = generate_list_of_names(
type="function",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_methods"],
abi_json=abi_json,
)
events = generate_list_of_names(
type="event",
subscription_filters=dashboard_subscription_filters,
read_abi=dashboard_subscription_filters["all_events"],
abi_json=abi_json,
)
abi_external_calls = [
item for item in abi_json if item["type"] == "external_call"
]
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
)
extention_data.append(
{
"display_name": "Overall unique token owners.",
"value": get_unique_address(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
),
}
)
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
"value": get_count(
name="HatchStartedEvent",
type="event",
db_session=db_session,
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
),
}
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
"value": get_count(
name="HatchFinishedEvent",
type="event",
db_session=db_session,
select_expression=distinct(
get_label_model(blockchain_type).label_data["args"][
"tokenId"
]
),
blockchain_type=blockchain_type,
address=address,
),
}
)
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
print(f"Timescale: {timescale}")
s3_data_object["blocks_state"] = current_blocks_state
s3_data_object["web3_metric"] = extention_data
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=methods,
start=start_date,
metric_type="tx_call",
)
s3_data_object["functions"] = functions_calls_data
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=events,
start=start_date,
metric_type="event",
)
s3_data_object["events"] = events_data
s3_data_object["generic"] = generate_metrics(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
metrics=generic,
start=start_date,
)
push_statistics(
statistics_data=s3_data_object,
subscription=subscription_by_id[subscription_id],
timescale=timescale,
bucket=bucket,
dashboard_id=dashboard.id,
)
except Exception as err:
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard.id}",
],
)
print(err)
def main() -> None:
parser = argparse.ArgumentParser(description="Command Line Interface")
parser.set_defaults(func=lambda _: parser.print_help())