Move create moonworm task to subscriptions api.

pull/478/head
Andrey Dolgolev 2021-12-06 21:42:30 +02:00
rodzic 7379c518a0
commit ac852f51bb
3 zmienionych plików z 55 dodań i 93 usunięć

Wyświetl plik

@ -480,8 +480,8 @@ def get_all_entries_from_search(
def apply_moonworm_tasks(
s3_client: Any,
dashboard_subscriptions: List[data.DashboardMeta],
available_subscriptions: Dict[uuid.UUID, Dict[str, Any]],
abi: Any,
address: str,
) -> None:
"""
Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist
@ -489,78 +489,45 @@ def apply_moonworm_tasks(
entries_pack = []
for dashboard_subscription in dashboard_subscriptions:
if dashboard_subscription.subscription_id in available_subscriptions.keys():
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{address}",
limit=100,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
try:
bucket = available_subscriptions[
dashboard_subscription.subscription_id
]["bucket"]
key = available_subscriptions[dashboard_subscription.subscription_id][
"s3_path"
]
existing_tags = [entry.tags for entry in entries]
if bucket is None or key is None:
logger.error(
f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi"
)
existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_metod_hash" in tag
]
s3_path = f"s3://{bucket}/{key}"
abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
for method in abi
if (method["type"] in ("event", "function"))
and (method.get("stateMutability", "") != "view")
}
try:
response = s3_client.get_object(
Bucket=bucket,
Key=key,
)
except s3_client.exceptions.NoSuchKey as e:
logger.error(
f"Error getting Abi for subscription {str(dashboard_subscription.subscription_id)} S3 {s3_path} does not exist : {str(e)}"
)
abi = json.loads(response["Body"].read())
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}",
limit=100,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"tags": [
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_metod_hash:{hash}",
f"status:active",
],
}
)
existing_tags = [entry.tags for entry in entries]
existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_metod_hash" in tag
]
abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
for method in abi
if (method["type"] in ("event", "function"))
and (method.get("stateMutability", "") != "view")
}
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
{
"title": available_subscriptions[
dashboard_subscription.subscription_id
]["address"],
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"tags": [
f"address:{available_subscriptions[dashboard_subscription.subscription_id]['address']}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_metod_hash:{hash}",
f"status:active",
],
}
)
except Exception as e:
reporter.error_report(e)
except Exception as e:
reporter.error_report(e)
if len(entries_pack) > 0:
bc.create_entries_pack(

Wyświetl plik

@ -7,7 +7,7 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Request, Query, Body, BackgroundTasks
from fastapi import APIRouter, Request, Query, Body
from .. import actions
from .. import data
@ -41,7 +41,6 @@ blockchain_by_subscription_id = {
@router.post("/", tags=["dashboards"], response_model=BugoutResource)
async def add_dashboard_handler(
request: Request,
background_tasks: BackgroundTasks,
dashboard: data.DashboardCreate = Body(...),
) -> BugoutResource:
"""
@ -150,15 +149,7 @@ async def add_dashboard_handler(
except Exception as e:
logger.error(f"Error creating dashboard resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
# Generate tasks for moonworm
background_tasks.add_task(
actions.apply_moonworm_tasks,
s3_client,
dashboard_subscriptions,
available_subscriptions,
)
return resource
@ -242,7 +233,6 @@ async def get_dashboard_handler(
async def update_dashboard_handler(
request: Request,
dashboard_id: str,
background_tasks: BackgroundTasks,
dashboard: data.DashboardUpdate = Body(...),
) -> BugoutResource:
"""
@ -350,13 +340,6 @@ async def update_dashboard_handler(
logger.error(f"Error updating subscription resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
background_tasks.add_task(
actions.apply_moonworm_tasks,
s3_client,
dashboard_subscriptions,
available_subscriptions,
)
return resource

Wyświetl plik

@ -10,13 +10,10 @@ from typing import List, Optional, Dict, Any
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Depends, Request, Form
from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks
from web3 import Web3
from ..actions import (
validate_abi_json,
upload_abi_to_s3,
)
from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks
from ..admin import subscription_types
from .. import data
from ..middleware import MoonstreamHTTPException
@ -41,6 +38,7 @@ BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
@router.post("/", tags=["subscriptions"], response_model=data.SubscriptionResourceData)
async def add_subscription_handler(
request: Request, # subscription_data: data.CreateSubscriptionRequest = Body(...)
background_tasks: BackgroundTasks,
address: str = Form(...),
color: str = Form(...),
label: str = Form(...),
@ -143,6 +141,12 @@ async def add_subscription_handler(
logger.error(f"Error getting user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
background_tasks.add_task(
apply_moonworm_tasks,
json_abi,
address,
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],
@ -234,6 +238,7 @@ async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListR
async def update_subscriptions_handler(
request: Request,
subscription_id: str,
background_tasks: BackgroundTasks,
color: Optional[str] = Form(None),
label: Optional[str] = Form(None),
abi: Optional[str] = Form(None),
@ -301,6 +306,13 @@ async def update_subscriptions_handler(
logger.error(f"Error getting user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if abi:
background_tasks.add_task(
apply_moonworm_tasks,
json_abi,
subscription_resource.resource_data["address"],
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],