Merge branch 'main' into reorg-metatx-db

pull/882/head
kompotkot 2023-08-03 10:58:41 +00:00
commit 2d93c307ab
8 zmienionych plików z 64 dodań i 73 usunięć

Wyświetl plik

@ -82,7 +82,7 @@ def get_entity_subscription_collection_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> Optional[str]:
) -> str:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
"""

Wyświetl plik

@ -9,41 +9,39 @@ from typing import Any, Dict, List
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource
from entity.data import EntityResponse # type: ignore
from bugout.data import BugoutJournalEntity, BugoutResource
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
get_label_model,
get_transaction_model,
)
from sqlalchemy import text
from .actions import (
generate_s3_access_links,
query_parameter_hash,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from . import data
from .actions import (
EntityCollectionNotFoundException,
generate_s3_access_links,
get_entity_subscription_collection_id,
query_parameter_hash,
)
from .middleware import MoonstreamHTTPException
from .settings import (
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
DOCS_TARGET_PATH,
LINKS_EXPIRATION_TIME,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
ORIGINS,
LINKS_EXPIRATION_TIME,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
)
from .settings import bugout_client as bc, entity_client as ec
from .settings import bugout_client as bc
from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION
@ -115,12 +113,11 @@ async def status_handler(
)
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=UUID(stats_update.user_id),
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
@ -136,20 +133,19 @@ async def status_handler(
s3_client = boto3.client("s3")
subscription_by_id: Dict[str, EntityResponse] = {}
subscription_by_id: Dict[str, BugoutJournalEntity] = {}
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
# get subscription by id
subscription: EntityResponse = ec.get_entity(
subscription: BugoutJournalEntity = bc.get_entity(
token=stats_update.token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=dashboard_subscription_filters["subscription_id"],
)
subscription_by_id[str(subscription.entity_id)] = subscription
subscription_by_id[str(subscription.id)] = subscription
try:
background_tasks.add_task(
@ -182,7 +178,7 @@ async def status_handler(
subscriprions_type = reqired_field["subscription_type_id"]
for timescale in stats_update.timescales:
presigned_urls_response[subscription_entity.entity_id] = {}
presigned_urls_response[subscription_entity.id] = {}
try:
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription_entity.address}/{stats_update.dashboard_id}/v1/{timescale}.json"
@ -201,7 +197,7 @@ async def status_handler(
HttpMethod="GET",
)
presigned_urls_response[subscription_entity.entity_id][timescale] = {
presigned_urls_response[subscription_entity.id][timescale] = {
"url": stats_presigned_url,
"headers": {
"If-Modified-Since": (

Wyświetl plik

@ -2,11 +2,11 @@ import argparse
import json
import logging
import os
from typing import Any, Dict
from typing import cast, List
import uuid
import requests # type: ignore
from bugout.data import BugoutSearchResult
from .utils import get_results_for_moonstream_query
from ..settings import (
@ -49,17 +49,18 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
limit=100,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
leaderboards_results = cast(List[BugoutSearchResult], leaderboards.results)
except Exception as e:
logger.error(f"Could not get leaderboards from journal: {e}")
return
if len(leaderboards.results) == 0:
if len(leaderboards_results) == 0:
logger.error("No leaderboard found")
return
logger.info(f"Found {len(leaderboards.results)} leaderboards")
logger.info(f"Found {len(leaderboards_results)} leaderboards")
for leaderboard in leaderboards.results:
for leaderboard in leaderboards_results:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)

Wyświetl plik

@ -195,7 +195,7 @@ def get_crawl_job_entries(
query += f" created_at:>={created_at_filter}"
current_offset = 0
entries = []
entries: List[BugoutSearchResult] = []
while True:
search_result = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -205,10 +205,11 @@ def get_crawl_job_entries(
limit=limit,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
entries.extend(search_result.results)
search_results = cast(List[BugoutSearchResult], search_result.results)
entries.extend(search_results)
# if len(entries) >= search_result.total_results:
if len(search_result.results) == 0:
if len(search_results) == 0:
break
current_offset += limit
return entries
@ -402,8 +403,9 @@ def _get_heartbeat_entry_id(
limit=1,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if entries.results:
return entries.results[0].entry_url.split("/")[-1]
search_results = cast(List[BugoutSearchResult], entries.results)
if search_results:
return search_results[0].entry_url.split("/")[-1]
else:
logger.info(f"No {crawler_type} heartbeat entry found, creating one")
entry = bugout_client.create_entry(

Wyświetl plik

@ -3,10 +3,8 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from entity.client import Entity # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
@ -14,15 +12,6 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"

Wyświetl plik

@ -6,21 +6,25 @@ import hashlib
import json
import logging
import time
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, cast, Dict, List, Optional, Union
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from entity.data import EntityResponse, EntityCollectionResponse # type: ignore
from bugout.data import (
BugoutJournalEntity,
BugoutResource,
BugoutResources,
BugoutSearchResultAsEntity,
)
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_transaction_model,
)
from sqlalchemy import and_, cast, distinct, extract, func, text
from sqlalchemy import and_, distinct, extract, func, text
from sqlalchemy import cast as sqlalchemy_cast
from sqlalchemy.orm import Session
from sqlalchemy.sql.operators import in_op
from web3 import Web3
@ -29,14 +33,14 @@ from ..blockchain import connect
from ..db import yield_db_read_only_session_ctx
from ..reporter import reporter
from ..settings import (
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -157,11 +161,13 @@ def generate_data(
.filter(in_op(label_model.label_data["name"].astext, functions))
.filter(
label_model.block_timestamp
>= cast(extract("epoch", start), label_model.block_timestamp.type)
>= sqlalchemy_cast(
extract("epoch", start), label_model.block_timestamp.type
)
)
.filter(
label_model.block_timestamp
< cast(
< sqlalchemy_cast(
extract("epoch", (start + timescales_delta[timescale]["timedelta"])),
label_model.block_timestamp.type,
)
@ -652,25 +658,26 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for user_id, collection_id in user_collection_by_id.items():
for user_id, journal_id in user_collection_by_id.items():
# request all subscriptions for user
user_subscriptions: EntityCollectionResponse = ec.search_entities(
user_subscriptions = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
required_field=[
"subscription_type_id:{}".format(
subscription_id_by_blockchain[args.blockchain]
)
],
journal_id=journal_id,
query=f"tag:subscription_type_id:{subscription_id_by_blockchain[args.blockchain]}",
representation="entity",
)
user_subscriptions_results = cast(
List[BugoutSearchResultAsEntity], user_subscriptions.results
)
logger.info(
f"Amount of user subscriptions: {len(user_subscriptions.entities)}"
f"Amount of user subscriptions: {len(user_subscriptions_results)}"
)
for subscription in user_subscriptions.entities:
subscription_id = str(subscription.entity_id)
for subscription in user_subscriptions_results:
entity_url_list = subscription.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
if subscription_id not in dashboards_by_subscription:
logger.info(
@ -1014,7 +1021,7 @@ def stats_generate_handler(args: argparse.Namespace):
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, EntityResponse],
subscription_by_id: Dict[str, BugoutJournalEntity],
access_id: Optional[UUID] = None,
):
"""

Wyświetl plik

@ -4,9 +4,6 @@ export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Entity environment variables
export MOONSTREAM_ENTITY_URL="https://api.moonstream.to/entity"
# Engine environment variables
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"

Wyświetl plik

@ -34,12 +34,11 @@ setup(
zip_safe=False,
install_requires=[
"boto3",
"bugout>=0.2.8",
"bugout>=0.2.12",
"chardet",
"fastapi",
"moonstreamdb>=0.3.4",
"moonstream>=0.1.1",
"moonstream-entity>=0.0.5",
"moonworm[moonstream]>=0.6.2",
"humbug",
"pydantic==1.9.2",