pull/847/head
Andrey 2023-07-13 05:44:44 +03:00
rodzic 3cc70294a0
commit 212bf0931f
3 zmienionych plików z 89 dodań i 36 usunięć

Wyświetl plik

@ -783,6 +783,7 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]):
print(type(user_abi))
return [
f"abi_name:{method['name']}"
for method in user_abi
@ -790,52 +791,96 @@ def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]):
]
def filter_tasks(entries, tag_filters):
def filter_tasks(entries, tag_filters: set):
return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)]
def fetch_and_filter_tasks(
journal_id, address, subscription_type_id, token, user_abi, limit=100
) -> List[BugoutSearchResult]:
def subscriptions_to_moonworm_jobs(
journal_id: str,
subscriptions: List[data.SubscriptionResourceData],
limit: int = 100,
) -> Dict[str, List[BugoutSearchResult]]:
"""
Fetch tasks from journal and filter them by user abi
"""
query_list: List[str] = []
for subscription in subscriptions:
query_list.append(
f"?tag:address:{subscription.address} ?tag:subscription_type:{subscription.subscription_type_id}"
)
entries = get_all_entries_from_search(
journal_id=journal_id,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}",
search_query=f" ".join(query_list),
limit=limit,
token=token,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
user_loaded_abi_tags = parse_abi_to_name_tags(json.loads(user_abi))
result: Dict[str, List[BugoutSearchResult]] = {}
moonworm_tasks = filter_tasks(entries, user_loaded_abi_tags)
### entries it's tasks in journal in tags we have blockchain_type, address, subscription_type_id, abi_name
### we create object like {subscription_id: {address: [tasks]}}
return moonworm_tasks
moonworm_tasks_normilized: Dict[
str, Dict[str, Dict[str, List[BugoutSearchResult]]]
] = {}
for entry in entries:
for tag in entry.tags:
if tag.startswith("subscription_type"):
subscription_type_id = tag.split(":")[-1]
if tag.startswith("address"):
address = tag.split(":")[-1]
def get_moonworm_tasks(
subscription_type_id: str,
address: str,
user_abi: List[Dict[str, Any]],
) -> List[BugoutSearchResult]:
"""
Get moonworm tasks from journal and filter them by user abi
"""
if subscription_type_id not in moonworm_tasks_normilized:
moonworm_tasks_normilized[subscription_type_id] = {}
try:
moonworm_tasks = fetch_and_filter_tasks(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
address=address,
subscription_type_id=subscription_type_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_abi=user_abi,
if address not in moonworm_tasks_normilized[subscription_type_id]:
moonworm_tasks_normilized[subscription_type_id][address] = {}
moonworm_tasks_normilized[subscription_type_id][address].append(entry)
print(moonworm_tasks_normilized.keys())
for subscription in subscriptions:
try:
abi = json.loads(subscription.abi)
except Exception as e:
logger.error(f"Error parse abi: {str(e)}")
print(subscription.abi)
continue
abi_name_tags = parse_abi_to_name_tags(abi)
### filter tasks by abi_name_tags
filtered_tasks = filter_tasks(
moonworm_tasks_normilized[subscription.subscription_type_id][
subscription.address
],
abi_name_tags,
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
MoonstreamHTTPException(status_code=500, internal_error=e)
return moonworm_tasks
result[subscription.id] = filtered_tasks
return result
def get_moonworm_tasks_batch(
subscriptions: List[data.SubscriptionResourceData],
token: str,
) -> Dict[str, List[BugoutSearchResult]]:
moonworm_tasks_batch: Dict[
str, List[BugoutSearchResult]
] = subscriptions_to_moonworm_jobs(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
subscriptions=[
subscription for subscription in subscriptions if subscription.abi
],
)
return moonworm_tasks_batch
def get_list_of_support_interfaces(

Wyświetl plik

@ -49,6 +49,7 @@ class SubscriptionResourceData(BaseModel):
label: Optional[str]
user_id: str
subscription_type_id: Optional[str]
jobs_status: Optional[Dict[str, Any]] = Field(default_factory=dict)
created_at: Optional[datetime]
updated_at: Optional[datetime]

Wyświetl plik

@ -19,9 +19,9 @@ from ..actions import (
apply_moonworm_tasks,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
get_moonworm_tasks,
check_if_smartcontract,
get_list_of_support_interfaces,
get_moonworm_tasks_batch,
)
from ..admin import subscription_types
from .. import data
@ -310,12 +310,19 @@ async def get_subscriptions_handler(
address=subscription.address,
color=color,
label=label,
abi="True" if subscription.secondary_fields.get("abi") else None,
abi=subscription.secondary_fields.get("abi", None),
subscription_type_id=subscription_type_id,
updated_at=subscription.updated_at,
created_at=subscription.created_at,
)
)
jobs = get_moonworm_tasks_batch(subscriptions=subscriptions, token=token)
for subscription in subscriptions:
if subscription.id in jobs:
subscription.jobs_status = jobs[subscription.id]
return data.SubscriptionsListResponse(subscriptions=subscriptions)
@ -537,13 +544,13 @@ async def get_subscription_jobs_handler(
subscription_address = subscription_resource.address
get_moonworm_jobs_response = get_moonworm_tasks(
subscription_type_id=subscription_type_id,
address=subscription_address,
user_abi=subscription_resource.secondary_fields.get("abi") or [],
)
# get_moonworm_jobs_response = get_moonworm_tasks(
# subscription_type_id=subscription_type_id,
# address=subscription_address,
# user_abi=subscription_resource.secondary_fields.get("abi") or [],
# )
return get_moonworm_jobs_response
return []
@router.get(