Moonstream API migration to spire entity

pull/875/head
kompotkot 2023-07-31 12:32:19 +00:00
rodzic 330a7feb3f
commit a3b0841d10
9 zmienionych plików z 236 dodań i 261 usunięć

Wyświetl plik

@ -1,28 +1,27 @@
from collections import OrderedDict
import hashlib
import json
from itertools import chain
import logging
from typing import List, Optional, Dict, Any, Union
from enum import Enum
import uuid
from collections import OrderedDict
from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Optional, Union
import boto3 # type: ignore
from bugout.data import (
BugoutSearchResults,
BugoutSearchResult,
BugoutJournal,
BugoutJournals,
BugoutResource,
BugoutResources,
BugoutSearchResult,
BugoutSearchResults,
)
from bugout.journal import SearchOrder
from bugout.exceptions import BugoutResponseException
from entity.data import EntityCollectionsResponse, EntityCollectionResponse # type: ignore
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import EthereumLabel
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.models import EthereumLabel
from slugify import slugify # type: ignore
from sqlalchemy import text
from sqlalchemy.orm import Session
@ -32,24 +31,20 @@ from web3._utils.validation import validate_abi
from . import data
from .middleware import MoonstreamHTTPException
from .reporter import reporter
from .selectors_storage import selectors
from .settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
ETHERSCAN_SMARTCONTRACTS_BUCKET,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
support_interfaces,
supportsInterface_abi,
multicall_contracts,
)
from .settings import bugout_client as bc, entity_client as ec
from .web3_provider import multicall, FunctionSignature, connect
from .selectors_storage import selectors
from .settings import bugout_client as bc
from .settings import multicall_contracts, support_interfaces, supportsInterface_abi
from .web3_provider import FunctionSignature, connect, multicall
logger = logging.getLogger(__name__)
@ -88,9 +83,9 @@ class ResourceQueryFetchException(Exception):
"""
class EntityCollectionNotFoundException(Exception):
class EntityJournalNotFoundException(Exception):
"""
Raised when entity collection is not found
Raised when journal (collection prev.) with entities not found.
"""
@ -641,14 +636,14 @@ def get_query_by_name(query_name: str, token: uuid.UUID) -> str:
return query_id
def get_entity_subscription_collection_id(
def get_entity_subscription_journal_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
create_if_not_exist: bool = False,
) -> Optional[str]:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
Get collection_id (journal_id) from brood resources. If journal not exist and create_if_not_exist is True
"""
params = {
@ -668,52 +663,49 @@ def get_entity_subscription_collection_id(
if len(resources.resources) == 0:
if not create_if_not_exist:
raise EntityCollectionNotFoundException(
"Subscription collection not found."
)
collection_id = generate_collection_for_user(resource_type, token, user_id)
raise EntityJournalNotFoundException("Subscription journal not found.")
journal_id = generate_journal_for_user(resource_type, token, user_id)
return collection_id
return journal_id
else:
resource = resources.resources[0]
return resource.resource_data["collection_id"]
def generate_collection_for_user(
def generate_journal_for_user(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> str:
try:
# try get collection
# Try get journal
collections: EntityCollectionsResponse = ec.list_collections(token=token)
journals: BugoutJournals = bc.list_journals(token=token)
available_collections: Dict[str, str] = {
collection.name: collection.collection_id
for collection in collections.collections
available_journals: Dict[str, str] = {
journal.name: journal.id for journal in journals.journals
}
subscription_collection_name = f"subscriptions_{user_id}"
subscription_journal_name = f"subscriptions_{user_id}"
if subscription_collection_name not in available_collections:
collection: EntityCollectionResponse = ec.add_collection(
token=token, name=subscription_collection_name
if subscription_journal_name not in available_journals:
journal: BugoutJournal = bc.create_journal(
token=token, name=subscription_journal_name
)
collection_id = collection.collection_id
journal_id = journal.id
else:
collection_id = available_collections[subscription_collection_name]
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
journal_id = available_journals[subscription_journal_name]
except Exception as e:
logger.error(f"Error create journal, error: {str(e)}")
raise MoonstreamHTTPException(
status_code=500, detail="Can't create collection for subscriptions"
status_code=500, detail="Can't create journal for subscriptions"
)
resource_data = {
"type": resource_type,
"user_id": str(user_id),
"collection_id": str(collection_id),
"collection_id": str(journal_id),
}
try:
@ -727,14 +719,14 @@ def generate_collection_for_user(
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
logger.error(
f"Required create resource data: {resource_data}, and grand access to journal: {collection_id}, for user: {user_id}"
f"Required create resource data: {resource_data}, and grand access to journal: {journal_id}, for user: {user_id}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try:
bc.update_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
journal_id=journal_id,
holder_type="user",
holder_id=user_id,
permission_list=[
@ -746,16 +738,16 @@ def generate_collection_for_user(
],
)
logger.info(
f"Grand access to journal: {collection_id}, for user: {user_id} successfully"
f"Grand access to journal: {journal_id}, for user: {user_id} successfully"
)
except Exception as e:
logger.error(f"Error updating journal scopes: {str(e)}")
logger.error(
f"Required grand access to journal: {collection_id}, for user: {user_id}"
f"Required grand access to journal: {journal_id}, for user: {user_id}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return collection_id
return journal_id
def generate_s3_access_links(

Wyświetl plik

@ -2,65 +2,65 @@
Generate entity subscriptions from existing brood resources subscriptions
"""
import hashlib
import logging
import json
import logging
import os
import traceback
from typing import List, Optional, Dict, Any, Union, Tuple
import uuid
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import boto3 # type: ignore
from bugout.data import BugoutResources, BugoutResource
from bugout.exceptions import BugoutResponseException
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from entity.data import EntityCollectionResponse, EntityResponse # type: ignore
from bugout.data import (
BugoutJournal,
BugoutJournalEntity,
BugoutResource,
BugoutResources,
)
from bugout.exceptions import BugoutResponseException, BugoutUnexpectedResponse
from ...settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_APPLICATION_ID,
BUGOUT_RESOURCE_TYPE_DASHBOARD,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
)
from ...settings import bugout_client as bc, entity_client as ec
from ...settings import bugout_client as bc
from ..subscription_types import CANONICAL_SUBSCRIPTION_TYPES
logger = logging.getLogger(__name__)
### create collection for user
### Create journal for user
def create_collection_for_user(user_id: uuid.UUID) -> str:
def create_journal_for_user(user_id: uuid.UUID) -> str:
"""
Create collection for user if not exist
Create journal (collection) for user if not exist
"""
try:
# try get collection
collection: EntityCollectionResponse = ec.add_collection(
# Try to get journal
journal: BugoutJournal = bc.create_journal(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, name=f"subscriptions_{user_id}"
)
collection_id = collection.collection_id
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
return str(collection_id)
journal_id = journal.id
except BugoutUnexpectedResponse as e:
logger.error(f"Error create journal, error: {str(e)}")
return str(journal_id)
def add_entity_subscription(
user_id: uuid.UUID,
subscription_type_id: str,
collection_id: str,
journal_id: str,
address: str,
color: str,
label: str,
content: Dict[str, Any],
) -> EntityResponse:
) -> BugoutJournalEntity:
"""
Add subscription to collection
Add subscription to journal (collection).
"""
if subscription_type_id not in CANONICAL_SUBSCRIPTION_TYPES:
@ -73,12 +73,12 @@ def add_entity_subscription(
f"Subscription type ID {subscription_type_id} is not a blockchain subscription type."
)
entity = ec.add_entity(
entity = bc.create_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
address=address,
blockchain=CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain,
name=label,
title=label,
required_fields=[
{"type": "subscription"},
{"subscription_type_id": f"{subscription_type_id}"},
@ -105,34 +105,19 @@ def get_abi_from_s3(s3_path: str, bucket: str):
logger.error(f"Error get ABI from S3: {str(e)}")
def revoke_collection_permissions_from_user(
user_id: uuid.UUID, collection_id: str, permissions: List[str]
):
"""
Remove all permissions from user
"""
bc.delete_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
holder_type="user",
holder_id=user_id,
permission_list=permissions,
)
def find_user_collection(
def find_user_journal(
user_id: uuid.UUID,
create_if_not_exists: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Find user collection in Brood resources
Can create new collection if not exists and create_if_not_exists = True
Find user journal (collection) in Brood resources
Can create new journal (collection) if not exists and create_if_not_exists = True
"""
params = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
}
logger.info(f"Looking for collection for user {user_id}")
logger.info(f"Looking for journal (collection) for user {user_id}")
try:
user_entity_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params
@ -147,18 +132,16 @@ def find_user_collection(
)
if len(user_entity_resources.resources) > 0:
collection_id = user_entity_resources.resources[0].resource_data[
"collection_id"
]
journal_id = user_entity_resources.resources[0].resource_data["collection_id"]
logger.info(
f"Collection found for user {user_id}. collection_id: {collection_id}"
f"Journal (collection) found for user {user_id}. journal_id: {journal_id}"
)
return collection_id, str(user_entity_resources.resources[0].id)
return journal_id, str(user_entity_resources.resources[0].id)
elif create_if_not_exists:
# Create collection new collection for user
logger.info(f"Creating new collection")
collection = create_collection_for_user(user_id)
return collection, None
# Create new journal for user
logger.info(f"Creating new journal (collection)")
journal_id = create_journal_for_user(user_id)
return journal_id, None
return None, None
@ -223,33 +206,35 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
### Create journals (collections) and add subscriptions
try:
for user_id, subscriptions in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
journal_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
### Journal can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
journal_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
"subscription_resource_id"
]
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
journal_id, resource_id_of_user_collection = find_user_journal(
user_id, create_if_not_exists=True
)
if collection_id is None:
logger.info(f"Collection not found or create for user {user_id}")
if journal_id is None:
logger.info(
f"Journal (collection) not found or create for user {user_id}"
)
continue
stages[user_id]["collection_id"] = collection_id
stages[user_id]["collection_id"] = journal_id
# Create user subscription collection resource
@ -262,7 +247,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
resource_data = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
"collection_id": str(collection_id),
"collection_id": str(journal_id),
"version": "1.0.0",
}
@ -318,11 +303,11 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
# Add subscription to collection
logger.info(f"Add subscription to collection: {collection_id}")
logger.info(f"Add subscription to journal (collection): {journal_id}")
entity = add_entity_subscription(
user_id=user_id,
collection_id=collection_id,
journal_id=journal_id,
subscription_type_id=subscription_type_id,
address=address,
color=color,
@ -331,7 +316,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
)
stages[user_id]["processed_subscriptions"][
str(subscription["subscription_id"])
] = {"entity_id": str(entity.entity_id), "dashboard_ids": []}
] = {"entity_id": str(entity.id), "dashboard_ids": []}
# Add permissions to user
@ -342,7 +327,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
try:
bc.update_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
journal_id=journal_id,
holder_type="user",
holder_id=user_id,
permission_list=[
@ -361,12 +346,12 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
continue
else:
logger.warn(
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behaivior!"
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behavior!"
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
logger.error(f"Failed to process user subscriptions: {str(e)}")
finally:
try:
with open("stages.json", "w") as f:
@ -561,18 +546,18 @@ def delete_generated_entity_subscriptions_from_brood_resources():
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
### Create journals and add subscriptions
try:
for user_id, _ in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
journal_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
journal_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
@ -581,35 +566,37 @@ def delete_generated_entity_subscriptions_from_brood_resources():
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
journal_id, resource_id_of_user_collection = find_user_journal(
user_id, create_if_not_exists=False
)
if collection_id is None:
if journal_id is None:
logger.info(f"Collection not found or create for user {user_id}")
continue
### Delete collection
try:
ec.delete_collection(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, collection_id=collection_id
bc.delete_journal(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=journal_id
)
logger.info(f"Collection deleted {collection_id}")
logger.info(f"Journal (collection) deleted {journal_id}")
except Exception as e:
logger.error(f"Failed to delete collection: {str(e)}")
logger.error(f"Failed to delete journal (collection): {str(e)}")
### Delete collection resource
try:
logger.info(f"Collection resource id {resource_id_of_user_collection}")
logger.info(
f"Journal (collection) resource id {resource_id_of_user_collection}"
)
bc.delete_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=resource_id_of_user_collection,
)
logger.info(
f"Collection resource deleted {resource_id_of_user_collection}"
f"Journal (collection) resource deleted {resource_id_of_user_collection}"
)
# clear stages
@ -617,12 +604,14 @@ def delete_generated_entity_subscriptions_from_brood_resources():
stages[user_id] = {}
except Exception as e:
logger.error(f"Failed to delete collection resource: {str(e)}")
logger.error(
f"Failed to delete journal (collection) resource: {str(e)}"
)
continue
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
logger.error(f"Failed to process user subscriptions: {str(e)}")
def restore_dashboard_state():
@ -659,7 +648,7 @@ def restore_dashboard_state():
dashboards_by_user[user_id].append(dashboard)
### Retunr all dashboards to old state
### Return all dashboards to old state
logger.info(f"Amount of users: {len(dashboards_by_user)}")
@ -738,41 +727,41 @@ def fix_duplicates_keys_in_entity_subscription():
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
# get collection ids from that resources
# get journal ids from that resources
collection_id_user_id_mappig = {}
collection_id_user_id_mapping = {}
for subscription in subscriptions.resources:
if "collection_id" in subscription.resource_data:
if (
subscription.resource_data["collection_id"]
not in collection_id_user_id_mappig
not in collection_id_user_id_mapping
):
collection_id_user_id_mappig[
collection_id_user_id_mapping[
subscription.resource_data["collection_id"]
] = subscription.resource_data["user_id"]
else:
raise Exception(
f"Duplicate collection_id {subscription.resource_data['collection_id']} in subscriptions"
)
# go through all collections and fix entities.
# go through all journals and fix entities.
# Will creating one new entity with same data but without "type:subscription" in required_fields
for collection_id, user_id in collection_id_user_id_mappig.items():
# get collection entities
collection_entities = ec.search_entities(
for journal_id, user_id in collection_id_user_id_mapping.items():
# get journal entities
journal_entities = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
required_field=[f"type:subscription"],
limit=1000,
representation="entity",
)
logger.info(
f"Amount of entities in user: {user_id} collection {collection_id}: {len(collection_entities.entities)}"
f"Amount of entities in user: {user_id} journal (collection) {journal_id}: {len(journal_entities.entities)}"
)
for entity in collection_entities.entities:
for entity in journal_entities.entities:
# get entity data
if entity.secondary_fields is None:
@ -813,43 +802,43 @@ def fix_duplicates_keys_in_entity_subscription():
)
new_required_fields.append({"entity_id": str(entity_id)})
new_entity = ec.add_entity(
new_entity = bc.create_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
blockchain=entity.blockchain,
address=entity.address,
name=entity.name,
title=entity.title,
required_fields=new_required_fields,
secondary_fields=entity.secondary_fields,
)
logger.info(
f"Entity {new_entity.entity_id} created successfully for collection {collection_id}"
f"Entity {new_entity.entity_id} created successfully for journal (collection) {journal_id}"
)
except Exception as e:
logger.error(
f"Failed to create entity {entity_id} for collection {collection_id}: {str(e)}, user_id: {user_id}"
f"Failed to create entity {entity_id} for journal (collection) {journal_id}: {str(e)}, user_id: {user_id}"
)
continue
# Update old entity without secondary_fields duplicate
try:
ec.update_entity(
bc.update_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
entity_id=entity_id,
blockchain=entity.blockchain,
address=entity.address,
name=entity.name,
title=entity.title,
required_fields=entity.required_fields,
secondary_fields=secondary_fields,
)
logger.info(
f"Entity {entity_id} updated successfully for collection {collection_id}"
f"Entity {entity_id} updated successfully for journal (collection) {journal_id}"
)
except Exception as e:
logger.error(
f"Failed to update entity {entity_id} for collection {collection_id}: {str(e)}, user_id: {user_id}"
f"Failed to update entity {entity_id} for journal (collection) {journal_id}: {str(e)}, user_id: {user_id}"
)

Wyświetl plik

@ -1,25 +1,23 @@
import argparse
from collections import Counter
import json
import logging
import textwrap
from typing import Any, Dict
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from moonstream.client import Moonstream # type: ignore
import logging
from typing import Dict, Any
import textwrap
from sqlalchemy import text
from ..actions import get_all_entries_from_search, name_normalization
from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_QUERIES_JOURNAL_ID,
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
)
from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
from ..actions import get_all_entries_from_search, name_normalization
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -1,10 +1,10 @@
"""
Pydantic schemas for the Moonstream HTTP API
"""
from datetime import datetime
import json
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union, Literal
from typing import Any, Dict, List, Literal, Optional, Union
from uuid import UUID
from xmlrpc.client import Boolean
@ -12,7 +12,6 @@ from fastapi import Form
from pydantic import BaseModel, Field, validator
from sqlalchemy import false
USER_ONBOARDING_STATE = "onboarding_state"
BUGOUT_RESOURCE_QUERY_RESOLVER = "query_name_resolver"
@ -244,7 +243,7 @@ class OnboardingState(BaseModel):
steps: Dict[str, int]
class SubdcriptionsAbiResponse(BaseModel):
class SubscriptionsAbiResponse(BaseModel):
abi: str

Wyświetl plik

@ -1,6 +1,5 @@
import json
import logging
from os import read
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
@ -16,15 +15,16 @@ from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import bugout_client as bc
from ..settings import entity_client as ec
logger = logging.getLogger(__name__)
@ -52,19 +52,20 @@ async def add_dashboard_handler(
subscription_settings = dashboard.subscription_settings
# Get user collection id
# Get user journal (collection) id
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriprions_list = bc.search(
token=token,
collection_id=collection_id,
journal_id=journal_id,
required_field=[f"type:subscription"],
limit=1000,
representation="entity",
)
# process existing subscriptions with supplied ids
@ -137,7 +138,7 @@ async def add_dashboard_handler(
tags=["subscriptions"],
response_model=BugoutResource,
)
async def delete_subscription_handler(request: Request, dashboard_id: str):
async def delete_subscription_handler(request: Request, dashboard_id: str = Path(...)):
"""
Delete subscriptions.
"""
@ -181,9 +182,9 @@ async def get_dashboards_handler(
return resources
@router.get("/{dashboarsd_id}", tags=["dashboards"], response_model=BugoutResource)
@router.get("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def get_dashboard_handler(
request: Request, dashboarsd_id: UUID
request: Request, dashboard_id: UUID = Path(...)
) -> BugoutResource:
"""
Get user's subscriptions.
@ -193,7 +194,7 @@ async def get_dashboard_handler(
try:
resource: BugoutResource = bc.get_resource(
token=token,
resource_id=dashboarsd_id,
resource_id=dashboard_id,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as e:
@ -211,7 +212,7 @@ async def get_dashboard_handler(
@router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def update_dashboard_handler(
request: Request,
dashboard_id: str,
dashboard_id: str = Path(...),
dashboard: data.DashboardUpdate = Body(...),
) -> BugoutResource:
"""
@ -224,19 +225,20 @@ async def update_dashboard_handler(
subscription_settings = dashboard.subscription_settings
# Get user collection id
# Get user journal (collection) id
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriprions_list = bc.search(
token=token,
collection_id=collection_id,
journal_id=journal_id,
required_field=[f"type:subscription"],
limit=1000,
representation="entity",
)
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
@ -301,7 +303,7 @@ async def update_dashboard_handler(
@router.get("/{dashboard_id}/stats", tags=["dashboards"])
async def get_dashboard_data_links_handler(
request: Request, dashboard_id: str
request: Request, dashboard_id: str = Path(...)
) -> Dict[Union[UUID, str], Any]:
"""
Get s3 presign urls for dashboard grafics
@ -328,17 +330,18 @@ async def get_dashboard_data_links_handler(
# get subscriptions
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriprions_list = bc.search(
token=token,
collection_id=collection_id,
journal_id=journal_id,
required_field=[f"type:subscription"],
limit=1000,
representation="entity",
)
# filter out dasboards

Wyświetl plik

@ -1,42 +1,38 @@
"""
The Moonstream subscriptions HTTP API
"""
from concurrent.futures import as_completed, ProcessPoolExecutor, ThreadPoolExecutor
import hashlib
import json
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
from bugout.exceptions import BugoutResponseException
from bugout.data import BugoutSearchResult
from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Path, Query, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from web3 import Web3
from .. import data
from ..actions import (
AddressNotSmartContractException,
validate_abi_json,
EntityJournalNotFoundException,
apply_moonworm_tasks,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
check_if_smart_contract,
get_entity_subscription_journal_id,
get_list_of_support_interfaces,
validate_abi_json,
)
from ..admin import subscription_types
from .. import data
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_ENTITIES_RESERVED_TAGS,
THREAD_TIMEOUT_SECONDS,
)
from ..web3_provider import (
yield_web3_provider,
)
from ..settings import bugout_client as bc
from ..web3_provider import yield_web3_provider
logger = logging.getLogger(__name__)
@ -160,36 +156,35 @@ async def add_subscription_handler(
required_fields.extend(allowed_required_fields)
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
create_if_not_exist=True,
)
entity = ec.add_entity(
entity = bc.create_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
address=address,
blockchain=subscription_types.CANONICAL_SUBSCRIPTION_TYPES[
subscription_type_id
].blockchain,
name=label,
title=label,
required_fields=required_fields,
secondary_fields=content,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Failed to get collection id")
logger.error(f"Failed to get journal id")
raise MoonstreamHTTPException(
status_code=500,
internal_error=e,
detail="Currently unable to get collection id",
detail="Currently unable to get journal id",
)
normalized_entity_tags = [
@ -200,7 +195,7 @@ async def add_subscription_handler(
]
return data.SubscriptionResourceData(
id=str(entity.entity_id),
id=str(entity.id),
user_id=str(user.id),
address=address,
color=color,
@ -219,28 +214,29 @@ async def add_subscription_handler(
tags=["subscriptions"],
response_model=data.SubscriptionResourceData,
)
async def delete_subscription_handler(request: Request, subscription_id: str):
async def delete_subscription_handler(
request: Request, subscription_id: str = Path(...)
):
"""
Delete subscriptions.
"""
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
deleted_entity = ec.delete_entity(
deleted_entity = bc.delete_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -272,7 +268,7 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
abi = deleted_entity.secondary_fields.get("abi")
return data.SubscriptionResourceData(
id=str(deleted_entity.entity_id),
id=str(deleted_entity.id),
user_id=str(user.id),
address=deleted_entity.address,
color=color,
@ -289,8 +285,8 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
@router.get("/", tags=["subscriptions"], response_model=data.SubscriptionsListResponse)
async def get_subscriptions_handler(
request: Request,
limit: Optional[int] = 10,
offset: Optional[int] = 0,
limit: Optional[int] = Query(10),
offset: Optional[int] = Query(0),
) -> data.SubscriptionsListResponse:
"""
Get user's subscriptions.
@ -298,25 +294,24 @@ async def get_subscriptions_handler(
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
create_if_not_exist=True,
)
subscriprions_list = ec.search_entities(
subscriptions_list = bc.search(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
journal_id=journal_id,
query="tag:type:subscription",
limit=limit,
offset=offset,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -328,7 +323,7 @@ async def get_subscriptions_handler(
subscriptions = []
for subscription in subscriprions_list.entities:
for subscription in subscriptions_list.entities:
tags = subscription.required_fields
label, color, subscription_type_id = None, None, None
@ -352,7 +347,7 @@ async def get_subscriptions_handler(
subscriptions.append(
data.SubscriptionResourceData(
id=str(subscription.entity_id),
id=str(subscription.id),
user_id=str(user.id),
address=subscription.address,
color=color,
@ -378,8 +373,8 @@ async def get_subscriptions_handler(
)
async def update_subscriptions_handler(
request: Request,
subscription_id: str,
background_tasks: BackgroundTasks,
subscription_id: str = Path(...),
) -> data.SubscriptionResourceData:
"""
Get user's subscriptions.
@ -401,16 +396,16 @@ async def update_subscriptions_handler(
tags = form_data.tags
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_entity = ec.get_entity(
subscription_entity = bc.get_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
@ -430,17 +425,17 @@ async def update_subscriptions_handler(
if not subscription_type_id:
logger.error(
f"Subscription entity {subscription_id} in collection {collection_id} has no subscription_type_id malformed subscription entity"
f"Subscription entity {subscription_id} in journal (collection) {journal_id} has no subscription_type_id malformed subscription entity"
)
raise MoonstreamHTTPException(
status_code=409,
detail="Not valid subscription entity",
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -488,17 +483,16 @@ async def update_subscriptions_handler(
if allowed_required_fields:
update_required_fields.extend(allowed_required_fields)
try:
subscription = ec.update_entity(
subscription = bc.update_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
title=subscription_entity.title,
address=subscription_entity.address,
blockchain=subscription_entity.blockchain,
name=subscription_entity.name,
required_fields=update_required_fields,
secondary_fields=update_secondary_fields,
)
except Exception as e:
logger.error(f"Error update user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -519,7 +513,7 @@ async def update_subscriptions_handler(
]
return data.SubscriptionResourceData(
id=str(subscription.entity_id),
id=str(subscription.id),
user_id=str(user.id),
address=subscription.address,
color=color,
@ -536,33 +530,33 @@ async def update_subscriptions_handler(
@router.get(
"/{subscription_id}/abi",
tags=["subscriptions"],
response_model=data.SubdcriptionsAbiResponse,
response_model=data.SubscriptionsAbiResponse,
)
async def get_subscription_abi_handler(
request: Request,
subscription_id: str,
) -> data.SubdcriptionsAbiResponse:
subscription_id: str = Path(...),
) -> data.SubscriptionsAbiResponse:
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_resource = ec.get_entity(
subscription_resource = bc.get_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -572,7 +566,7 @@ async def get_subscription_abi_handler(
if "abi" not in subscription_resource.secondary_fields.keys():
raise MoonstreamHTTPException(status_code=404, detail="Abi not found")
return data.SubdcriptionsAbiResponse(
return data.SubscriptionsAbiResponse(
abi=subscription_resource.secondary_fields["abi"]
)
@ -605,7 +599,7 @@ async def list_subscription_types() -> data.SubscriptionTypesListResponse:
tags=["subscriptions"],
response_model=data.ContractInfoResponse,
)
async def address_info(request: Request, address: str):
async def address_info(request: Request, address: str = Query(...)):
"""
Looking if address is contract
"""
@ -668,8 +662,8 @@ async def address_info(request: Request, address: str):
)
def get_contract_interfaces(
request: Request,
address: str,
blockchain: str,
address: str = Query(...),
blockchain: str = Query(...),
):
"""
Request contract interfaces from web3

Wyświetl plik

@ -10,7 +10,7 @@ from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Form, Request
from .. import data
from ..actions import create_onboarding_resource, generate_collection_for_user
from ..actions import create_onboarding_resource
from ..middleware import MoonstreamHTTPException
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
from ..settings import bugout_client as bc

Wyświetl plik

@ -9,7 +9,7 @@ base58==2.1.1
bitarray==2.6.0
boto3==1.26.5
botocore==1.29.5
bugout>=0.2.10
bugout>=0.2.12
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3

Wyświetl plik

@ -13,12 +13,12 @@ setup(
install_requires=[
"appdirs",
"boto3",
"bugout>=0.2.10",
"bugout>=0.2.12",
"moonstream-entity>=0.0.5",
"fastapi",
"moonstreamdb>=0.3.4",
"humbug",
"pydantic",
"pydantic==1.10.2",
"pyevmasm",
"python-dateutil",
"python-multipart",