kopia lustrzana https://github.com/bugout-dev/moonstream
Merge pull request #855 from moonstream-to/fix-missing-addresses
Revert jobs endpoint changes.pull/858/head
commit
03fcf456c7
|
@ -784,117 +784,6 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
|
|||
return hash
|
||||
|
||||
|
||||
def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]):
|
||||
return [
|
||||
f"abi_name:{method['name']}"
|
||||
for method in user_abi
|
||||
if method["type"] in ("event", "function")
|
||||
]
|
||||
|
||||
|
||||
def filter_tasks(entries, tag_filters: set):
|
||||
return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)]
|
||||
|
||||
|
||||
def subscriptions_to_moonworm_jobs(
|
||||
journal_id: str,
|
||||
subscriptions: List[data.SubscriptionResourceData],
|
||||
limit: int = 100,
|
||||
) -> Dict[str, List[BugoutSearchResult]]:
|
||||
"""
|
||||
Fetch tasks from journal and filter them by user abi
|
||||
"""
|
||||
|
||||
query_per_chain: Dict[str, List[str]] = {}
|
||||
|
||||
for subscription in subscriptions:
|
||||
subscription_type_id = subscription.subscription_type_id
|
||||
|
||||
if subscription_type_id not in query_per_chain:
|
||||
query_per_chain[subscription_type_id] = []
|
||||
|
||||
query_per_chain[subscription_type_id].append(
|
||||
f"?tag:address:{subscription.address}"
|
||||
)
|
||||
|
||||
entries = []
|
||||
|
||||
for chain in query_per_chain:
|
||||
query_per_chain[chain].append(f"tag:subscription_type:{chain}")
|
||||
entries = get_all_entries_from_search(
|
||||
journal_id=journal_id,
|
||||
search_query=f" ".join(query_per_chain[chain]),
|
||||
limit=limit,
|
||||
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
|
||||
)
|
||||
|
||||
entries.extend(entries)
|
||||
|
||||
result: Dict[str, List[BugoutSearchResult]] = {}
|
||||
|
||||
### entries it's tasks in journal in tags we have blockchain_type, address, subscription_type_id, abi_name
|
||||
### we create object like {subscription_id: {address: [tasks]}}
|
||||
|
||||
moonworm_tasks_normilized: Dict[
|
||||
str, Dict[str, Dict[str, List[BugoutSearchResult]]]
|
||||
] = {}
|
||||
|
||||
for entry in entries:
|
||||
for tag in entry.tags:
|
||||
if tag.startswith("subscription_type"):
|
||||
subscription_type_id = tag.split(":")[-1]
|
||||
if tag.startswith("address"):
|
||||
address = tag.split(":")[-1]
|
||||
|
||||
if subscription_type_id not in moonworm_tasks_normilized:
|
||||
moonworm_tasks_normilized[subscription_type_id] = {}
|
||||
|
||||
if address not in moonworm_tasks_normilized[subscription_type_id]:
|
||||
moonworm_tasks_normilized[subscription_type_id][address] = []
|
||||
|
||||
moonworm_tasks_normilized[subscription_type_id][address].append(entry)
|
||||
|
||||
for subscription in subscriptions:
|
||||
try:
|
||||
abi = json.loads(subscription.abi)
|
||||
except Exception as e:
|
||||
logger.error(f"Error parse abi: {str(e)}")
|
||||
continue
|
||||
abi_name_tags = parse_abi_to_name_tags(abi)
|
||||
|
||||
### filter tasks by abi_name_tags
|
||||
|
||||
if subscription.subscription_type_id not in moonworm_tasks_normilized:
|
||||
continue
|
||||
|
||||
filtered_tasks = filter_tasks(
|
||||
moonworm_tasks_normilized[subscription.subscription_type_id][
|
||||
subscription.address
|
||||
],
|
||||
abi_name_tags,
|
||||
)
|
||||
|
||||
result[subscription.id] = filtered_tasks
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_moonworm_tasks_batch(
|
||||
subscriptions: List[data.SubscriptionResourceData],
|
||||
token: str,
|
||||
) -> Dict[str, List[BugoutSearchResult]]:
|
||||
moonworm_tasks_batch: Dict[
|
||||
str, List[BugoutSearchResult]
|
||||
] = subscriptions_to_moonworm_jobs(
|
||||
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
|
||||
subscriptions=[
|
||||
subscription for subscription in subscriptions if subscription.abi
|
||||
],
|
||||
)
|
||||
|
||||
return moonworm_tasks_batch
|
||||
|
||||
|
||||
def get_list_of_support_interfaces(
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
address: str,
|
||||
|
|
|
@ -47,14 +47,13 @@ class SubscriptionTypesListResponse(BaseModel):
|
|||
class SubscriptionResourceData(BaseModel):
|
||||
id: str
|
||||
address: Optional[str]
|
||||
abi: Optional[str]
|
||||
abi: Optional[Union[str, bool]]
|
||||
color: Optional[str]
|
||||
label: Optional[str]
|
||||
description: Optional[str] = None
|
||||
tags: List[str] = Field(default_factory=list)
|
||||
user_id: str
|
||||
subscription_type_id: Optional[str]
|
||||
jobs_status: Optional[List[Dict[str, Any]]] = Field(default_factory=list)
|
||||
created_at: Optional[datetime]
|
||||
updated_at: Optional[datetime]
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ from ..actions import (
|
|||
EntityCollectionNotFoundException,
|
||||
check_if_smartcontract,
|
||||
get_list_of_support_interfaces,
|
||||
get_moonworm_tasks_batch,
|
||||
)
|
||||
from ..admin import subscription_types
|
||||
from .. import data
|
||||
|
@ -357,7 +356,9 @@ async def get_subscriptions_handler(
|
|||
address=subscription.address,
|
||||
color=color,
|
||||
label=label,
|
||||
abi=subscription.secondary_fields.get("abi", None),
|
||||
abi="True"
|
||||
if subscription.secondary_fields.get("abi", None)
|
||||
else "False", ### TODO(ANDREY): remove this hack when frontend is updated
|
||||
description=subscription.secondary_fields.get("description"),
|
||||
tags=normalized_entity_tags,
|
||||
subscription_type_id=subscription_type_id,
|
||||
|
@ -366,12 +367,6 @@ async def get_subscriptions_handler(
|
|||
)
|
||||
)
|
||||
|
||||
jobs = get_moonworm_tasks_batch(subscriptions=subscriptions, token=token)
|
||||
|
||||
for subscription in subscriptions:
|
||||
if subscription.id in jobs:
|
||||
subscription.jobs_status = jobs[subscription.id]
|
||||
|
||||
return data.SubscriptionsListResponse(subscriptions=subscriptions)
|
||||
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from typing import Any, Optional, Union, Callable
|
||||
from typing import Any, Optional, Union, Callable, Dict
|
||||
from web3 import Web3
|
||||
from web3.middleware import geth_poa_middleware
|
||||
from eth_abi import encode_single, decode_single
|
||||
|
@ -45,7 +45,7 @@ def connect(
|
|||
access_id: Optional[UUID] = None,
|
||||
user_token: Optional[UUID] = None,
|
||||
) -> Web3:
|
||||
request_kwargs: D = {}
|
||||
request_kwargs: Dict[str, Any] = {}
|
||||
|
||||
if blockchain_type != AvailableBlockchainType.WYRM:
|
||||
request_kwargs = {
|
||||
|
|
Ładowanie…
Reference in New Issue