Add init version.

pull/478/head
Andrey Dolgolev 2021-12-03 22:53:05 +02:00
rodzic 6c895bc114
commit b35a771a95
3 zmienionych plików z 167 dodań i 7 usunięć

Wyświetl plik

@ -1,6 +1,9 @@
import hashlib
import json
from itertools import chain
import logging
from typing import Optional, Dict, Any, Union
from typing import List, Optional, Dict, Any, Union
import time
from enum import Enum
import uuid
@ -22,7 +25,7 @@ from . import data
from .reporter import reporter
from .middleware import MoonstreamHTTPException
from .settings import ETHERSCAN_SMARTCONTRACTS_BUCKET
from bugout.data import BugoutResource
from bugout.data import BugoutResource, DashboardMeta
from .settings import (
MOONSTREAM_APPLICATION_ID,
bugout_client as bc,
@ -31,6 +34,7 @@ from .settings import (
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from web3 import Web3
@ -430,3 +434,132 @@ def upload_abi_to_s3(
update["s3_path"] = result_key
return update
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str
) -> List[Any]:
"""
Get all required entries from journal using search interface
"""
skips = 0
offset = 0
results: List[Any] = []
while skips < 3:
try:
existing_metods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_metods.results)
except:
time.sleep(0.5)
skips += 1
continue
if len(results) == existing_metods.total_results:
break
else:
offset += limit
continue
return results
def apply_moonworm_tasks(
s3_client: Any,
dashboard_subscriptions: List[DashboardMeta],
available_subscriptions: Dict[uuid.UUID, Dict[str, Any]],
) -> None:
"""
Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist
"""
entries_pack = []
for dashboard_subscription in dashboard_subscriptions:
if dashboard_subscription.subscription_id in available_subscriptions.keys():
bucket = available_subscriptions[dashboard_subscription.subscription_id][
"bucket"
]
key = available_subscriptions[dashboard_subscription.subscription_id][
"s3_path"
]
if bucket is None or key is None:
logger.error(
f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi"
)
s3_path = f"s3://{bucket}/{key}"
try:
response = s3_client.get_object(
Bucket=bucket,
Key=key,
)
except s3_client.exceptions.NoSuchKey as e:
logger.error(
f"Error getting Abi for subscription {str(dashboard_subscription.subscription_id)} S3 {s3_path} does not exist : {str(e)}"
)
abi = json.loads(response["Body"].read())
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}",
limit=100,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
existing_tags = [entry.tags for entry in entries]
existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_metod_hash" in tag
]
abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
for method in abi
if (method["type"] in ("event", "function"))
and (
"stateMutability" in method and method["stateMutability"] == "view"
)
}
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
{
"title": available_subscriptions[
dashboard_subscription.subscription_id
]["address"],
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"tags": [
f"address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_metod_hash:{hash}",
f"status:active",
],
}
)
if len(entries_pack) > 0:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=entries_pack,
timeout=10,
)

Wyświetl plik

@ -7,7 +7,7 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Request, Query, Body
from fastapi import APIRouter, Request, Query, Body, BackgroundTasks
from .. import actions
from .. import data
@ -20,7 +20,6 @@ from ..settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
import pprint
logger = logging.getLogger(__name__)
@ -41,7 +40,9 @@ blockchain_by_subscription_id = {
@router.post("/", tags=["dashboards"], response_model=BugoutResource)
async def add_dashboard_handler(
request: Request, dashboard: data.DashboardCreate = Body(...)
request: Request,
background_tasks: BackgroundTasks,
dashboard: data.DashboardCreate = Body(...),
) -> BugoutResource:
"""
Add subscription to blockchain stream data for user.
@ -73,7 +74,7 @@ async def add_dashboard_handler(
s3_client = boto3.client("s3")
available_subscriptions = {
available_subscriptions: Dict[UUID, Dict[str, Any]] = {
resource.id: resource.resource_data for resource in resources.resources
}
@ -150,6 +151,14 @@ async def add_dashboard_handler(
logger.error(f"Error creating dashboard resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# Generate tasks for moonworm
background_tasks.add_task(
actions.apply_moonworm_tasks,
s3_client,
dashboard_subscriptions,
available_subscriptions,
)
return resource
@ -231,7 +240,10 @@ async def get_dashboard_handler(
@router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def update_dashboard_handler(
request: Request, dashboard_id: str, dashboard: data.DashboardUpdate = Body(...)
request: Request,
dashboard_id: str,
background_tasks: BackgroundTasks,
dashboard: data.DashboardUpdate = Body(...),
) -> BugoutResource:
"""
Update dashboards mainly fully overwrite name and subscription metadata
@ -338,6 +350,13 @@ async def update_dashboard_handler(
logger.error(f"Error updating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
background_tasks.add_task(
actions.apply_moonworm_tasks,
s3_client,
dashboard_subscriptions,
available_subscriptions,
)
return resource

Wyświetl plik

@ -84,3 +84,11 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/")
)
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)
if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
raise ValueError(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)