Mooncrawl entity migration to spire

pull/877/head
kompotkot 2023-08-01 08:58:36 +00:00
rodzic f5b85d0b5f
commit 92dbfe5576
8 zmienionych plików z 64 dodań i 73 usunięć

Wyświetl plik

@ -82,7 +82,7 @@ def get_entity_subscription_collection_id(
resource_type: str, resource_type: str,
token: Union[uuid.UUID, str], token: Union[uuid.UUID, str],
user_id: uuid.UUID, user_id: uuid.UUID,
) -> Optional[str]: ) -> str:
""" """
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
""" """

Wyświetl plik

@ -9,41 +9,39 @@ from typing import Any, Dict, List
from uuid import UUID from uuid import UUID
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResource from bugout.data import BugoutJournalEntity, BugoutResource
from entity.data import EntityResponse # type: ignore
from fastapi import BackgroundTasks, FastAPI from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import ( from moonstreamdb.blockchain import (
AvailableBlockchainType, AvailableBlockchainType,
get_label_model,
get_block_model, get_block_model,
get_label_model,
get_transaction_model, get_transaction_model,
) )
from sqlalchemy import text from sqlalchemy import text
from .actions import (
generate_s3_access_links,
query_parameter_hash,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from . import data from . import data
from .actions import (
EntityCollectionNotFoundException,
generate_s3_access_links,
get_entity_subscription_collection_id,
query_parameter_hash,
)
from .middleware import MoonstreamHTTPException from .middleware import MoonstreamHTTPException
from .settings import ( from .settings import (
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION, BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN, BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
DOCS_TARGET_PATH, DOCS_TARGET_PATH,
LINKS_EXPIRATION_TIME,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_QUERIES_BUCKET, MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID, NB_CONTROLLER_ACCESS_ID,
ORIGINS, ORIGINS,
LINKS_EXPIRATION_TIME,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
) )
from .settings import bugout_client as bc, entity_client as ec from .settings import bugout_client as bc
from .stats_worker import dashboard, queries from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION from .version import MOONCRAWL_VERSION
@ -115,12 +113,11 @@ async def status_handler(
) )
try: try:
collection_id = get_entity_subscription_collection_id( journal_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION, resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=UUID(stats_update.user_id), user_id=UUID(stats_update.user_id),
) )
except EntityCollectionNotFoundException as e: except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException( raise MoonstreamHTTPException(
status_code=404, status_code=404,
@ -136,20 +133,19 @@ async def status_handler(
s3_client = boto3.client("s3") s3_client = boto3.client("s3")
subscription_by_id: Dict[str, EntityResponse] = {} subscription_by_id: Dict[str, BugoutJournalEntity] = {}
for dashboard_subscription_filters in dashboard_resource.resource_data[ for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings" "subscription_settings"
]: ]:
# get subscription by id # get subscription by id
subscription: BugoutJournalEntity = bc.get_entity(
subscription: EntityResponse = ec.get_entity(
token=stats_update.token, token=stats_update.token,
collection_id=collection_id, journal_id=journal_id,
entity_id=dashboard_subscription_filters["subscription_id"], entity_id=dashboard_subscription_filters["subscription_id"],
) )
subscription_by_id[str(subscription.entity_id)] = subscription subscription_by_id[str(subscription.id)] = subscription
try: try:
background_tasks.add_task( background_tasks.add_task(
@ -182,7 +178,7 @@ async def status_handler(
subscriprions_type = reqired_field["subscription_type_id"] subscriprions_type = reqired_field["subscription_type_id"]
for timescale in stats_update.timescales: for timescale in stats_update.timescales:
presigned_urls_response[subscription_entity.entity_id] = {} presigned_urls_response[subscription_entity.id] = {}
try: try:
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription_entity.address}/{stats_update.dashboard_id}/v1/{timescale}.json" result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription_entity.address}/{stats_update.dashboard_id}/v1/{timescale}.json"
@ -201,7 +197,7 @@ async def status_handler(
HttpMethod="GET", HttpMethod="GET",
) )
presigned_urls_response[subscription_entity.entity_id][timescale] = { presigned_urls_response[subscription_entity.id][timescale] = {
"url": stats_presigned_url, "url": stats_presigned_url,
"headers": { "headers": {
"If-Modified-Since": ( "If-Modified-Since": (

Wyświetl plik

@ -2,11 +2,11 @@ import argparse
import json import json
import logging import logging
import os import os
from typing import Any, Dict from typing import cast, List
import uuid import uuid
import requests # type: ignore import requests # type: ignore
from bugout.data import BugoutSearchResult
from .utils import get_results_for_moonstream_query from .utils import get_results_for_moonstream_query
from ..settings import ( from ..settings import (
@ -49,17 +49,18 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
limit=100, limit=100,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
) )
leaderboards_results = cast(List[BugoutSearchResult], leaderboards.results)
except Exception as e: except Exception as e:
logger.error(f"Could not get leaderboards from journal: {e}") logger.error(f"Could not get leaderboards from journal: {e}")
return return
if len(leaderboards.results) == 0: if len(leaderboards_results) == 0:
logger.error("No leaderboard found") logger.error("No leaderboard found")
return return
logger.info(f"Found {len(leaderboards.results)} leaderboards") logger.info(f"Found {len(leaderboards_results)} leaderboards")
for leaderboard in leaderboards.results: for leaderboard in leaderboards_results:
logger.info( logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}" f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
) )

Wyświetl plik

@ -195,7 +195,7 @@ def get_crawl_job_entries(
query += f" created_at:>={created_at_filter}" query += f" created_at:>={created_at_filter}"
current_offset = 0 current_offset = 0
entries = [] entries: List[BugoutSearchResult] = []
while True: while True:
search_result = bugout_client.search( search_result = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -205,10 +205,11 @@ def get_crawl_job_entries(
limit=limit, limit=limit,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
) )
entries.extend(search_result.results) search_results = cast(List[BugoutSearchResult], search_result.results)
entries.extend(search_results)
# if len(entries) >= search_result.total_results: # if len(entries) >= search_result.total_results:
if len(search_result.results) == 0: if len(search_results) == 0:
break break
current_offset += limit current_offset += limit
return entries return entries
@ -402,8 +403,9 @@ def _get_heartbeat_entry_id(
limit=1, limit=1,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
) )
if entries.results: search_results = cast(List[BugoutSearchResult], entries.results)
return entries.results[0].entry_url.split("/")[-1] if search_results:
return search_results[0].entry_url.split("/")[-1]
else: else:
logger.info(f"No {crawler_type} heartbeat entry found, creating one") logger.info(f"No {crawler_type} heartbeat entry found, creating one")
entry = bugout_client.create_entry( entry = bugout_client.create_entry(

Wyświetl plik

@ -3,10 +3,8 @@ from typing import Dict, Optional
from uuid import UUID from uuid import UUID
from bugout.app import Bugout from bugout.app import Bugout
from entity.client import Entity # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.blockchain import AvailableBlockchainType
# Bugout # Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev") BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
@ -14,15 +12,6 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL) bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to") MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get( MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to" "MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"

Wyświetl plik

@ -6,21 +6,25 @@ import hashlib
import json import json
import logging import logging
import time import time
import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, cast, Dict, List, Optional, Union
from uuid import UUID from uuid import UUID
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources from bugout.data import (
from entity.data import EntityResponse, EntityCollectionResponse # type: ignore BugoutJournalEntity,
BugoutResource,
BugoutResources,
BugoutSearchResultAsEntity,
)
from moonstreamdb.blockchain import ( from moonstreamdb.blockchain import (
AvailableBlockchainType, AvailableBlockchainType,
get_label_model, get_label_model,
get_transaction_model, get_transaction_model,
) )
from sqlalchemy import and_, cast, distinct, extract, func, text from sqlalchemy import and_, distinct, extract, func, text
from sqlalchemy import cast as sqlalchemy_cast
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.sql.operators import in_op from sqlalchemy.sql.operators import in_op
from web3 import Web3 from web3 import Web3
@ -29,14 +33,14 @@ from ..blockchain import connect
from ..db import yield_db_read_only_session_ctx from ..db import yield_db_read_only_session_ctx
from ..reporter import reporter from ..reporter import reporter
from ..settings import ( from ..settings import (
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
CRAWLER_LABEL, CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX, MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID, NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
) )
from ..settings import bugout_client as bc, entity_client as ec from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -157,11 +161,13 @@ def generate_data(
.filter(in_op(label_model.label_data["name"].astext, functions)) .filter(in_op(label_model.label_data["name"].astext, functions))
.filter( .filter(
label_model.block_timestamp label_model.block_timestamp
>= cast(extract("epoch", start), label_model.block_timestamp.type) >= sqlalchemy_cast(
extract("epoch", start), label_model.block_timestamp.type
)
) )
.filter( .filter(
label_model.block_timestamp label_model.block_timestamp
< cast( < sqlalchemy_cast(
extract("epoch", (start + timescales_delta[timescale]["timedelta"])), extract("epoch", (start + timescales_delta[timescale]["timedelta"])),
label_model.block_timestamp.type, label_model.block_timestamp.type,
) )
@ -652,25 +658,26 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {} address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for user_id, collection_id in user_collection_by_id.items(): for user_id, journal_id in user_collection_by_id.items():
# request all subscriptions for user # request all subscriptions for user
user_subscriptions: EntityCollectionResponse = ec.search_entities( user_subscriptions = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id, journal_id=journal_id,
required_field=[ query=f"tag:subscription_type_id:{subscription_id_by_blockchain[args.blockchain]}",
"subscription_type_id:{}".format( representation="entity",
subscription_id_by_blockchain[args.blockchain]
)
],
) )
user_subscriptions_results = cast(
List[BugoutSearchResultAsEntity], user_subscriptions.results
)
logger.info( logger.info(
f"Amount of user subscriptions: {len(user_subscriptions.entities)}" f"Amount of user subscriptions: {len(user_subscriptions_results)}"
) )
for subscription in user_subscriptions.entities: for subscription in user_subscriptions_results:
subscription_id = str(subscription.entity_id) entity_url_list = subscription.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
if subscription_id not in dashboards_by_subscription: if subscription_id not in dashboards_by_subscription:
logger.info( logger.info(
@ -1014,7 +1021,7 @@ def stats_generate_handler(args: argparse.Namespace):
def stats_generate_api_task( def stats_generate_api_task(
timescales: List[str], timescales: List[str],
dashboard: BugoutResource, dashboard: BugoutResource,
subscription_by_id: Dict[str, EntityResponse], subscription_by_id: Dict[str, BugoutJournalEntity],
access_id: Optional[UUID] = None, access_id: Optional[UUID] = None,
): ):
""" """

Wyświetl plik

@ -4,9 +4,6 @@ export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>" export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Entity environment variables
export MOONSTREAM_ENTITY_URL="https://api.moonstream.to/entity"
# Engine environment variables # Engine environment variables
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to" export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"

Wyświetl plik

@ -34,12 +34,11 @@ setup(
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
"boto3", "boto3",
"bugout>=0.2.8", "bugout>=0.2.12",
"chardet", "chardet",
"fastapi", "fastapi",
"moonstreamdb>=0.3.4", "moonstreamdb>=0.3.4",
"moonstream>=0.1.1", "moonstream>=0.1.1",
"moonstream-entity>=0.0.5",
"moonworm[moonstream]>=0.6.2", "moonworm[moonstream]>=0.6.2",
"humbug", "humbug",
"pydantic==1.9.2", "pydantic==1.9.2",