Merge pull request #876 from moonstream-to/return-jobs-endpoint

Return jobs endpoint
pull/915/head
Andrey Dolgolev 2023-08-31 13:40:50 +03:00 zatwierdzone przez GitHub
commit 87997add7d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 176 dodań i 79 usunięć

Wyświetl plik

@ -785,6 +785,62 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
return hash
def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]):
return [
f"abi_name:{method['name']}"
for method in user_abi
if method["type"] in ("event", "function")
]
def filter_tasks(entries, tag_filters):
return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)]
def fetch_and_filter_tasks(
journal_id, address, subscription_type_id, token, user_abi, limit=100
) -> List[BugoutSearchResult]:
"""
Fetch tasks from journal and filter them by user abi
"""
entries = get_all_entries_from_search(
journal_id=journal_id,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}",
limit=limit,
token=token,
)
user_loaded_abi_tags = parse_abi_to_name_tags(json.loads(user_abi))
moonworm_tasks = filter_tasks(entries, user_loaded_abi_tags)
return moonworm_tasks
def get_moonworm_tasks(
subscription_type_id: str,
address: str,
user_abi: List[Dict[str, Any]],
) -> List[BugoutSearchResult]:
"""
Get moonworm tasks from journal and filter them by user abi
"""
try:
moonworm_tasks = fetch_and_filter_tasks(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
address=address,
subscription_type_id=subscription_type_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_abi=user_abi,
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
MoonstreamHTTPException(status_code=500, internal_error=e)
return moonworm_tasks
def get_list_of_support_interfaces(
blockchain_type: AvailableBlockchainType,
address: str,
@ -795,97 +851,86 @@ def get_list_of_support_interfaces(
Returns list of interfaces supported by given address
"""
_, _, is_contract = check_if_smart_contract(
blockchain_type=blockchain_type, address=address, user_token=user_token
)
if not is_contract:
raise AddressNotSmartContractException(f"Address not are smart contract")
web3_client = connect(blockchain_type, user_token=user_token)
contract = web3_client.eth.contract(
address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
)
calls = []
list_of_interfaces = list(selectors.keys())
list_of_interfaces.sort()
for interaface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(contract.get_function_by_name("supportsInterface"))
.encode_data([bytes.fromhex(interaface)])
.hex(),
)
try:
_, _, is_contract = check_if_smart_contract(
blockchain_type=blockchain_type, address=address, user_token=user_token
)
result = {}
if not is_contract:
raise AddressNotSmartContractException(f"Address not are smart contract")
if blockchain_type in multicall_contracts:
calls = []
web3_client = connect(blockchain_type, user_token=user_token)
list_of_interfaces = list(selectors.keys())
contract = web3_client.eth.contract(
address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
)
list_of_interfaces.sort()
result = {}
for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
if blockchain_type in multicall_contracts:
calls = []
list_of_interfaces = list(selectors.keys())
list_of_interfaces.sort()
for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
)
.encode_data([bytes.fromhex(interface)])
.hex(),
)
.encode_data([bytes.fromhex(interface)])
.hex(),
)
)
try:
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")
try:
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")
for i, selector in enumerate(list_of_interfaces):
if multicall_result[i][0]:
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])
for i, selector in enumerate(list_of_interfaces):
if multicall_result[i][0]:
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])
if supported[0]:
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector,
"abi": selectors[selector]["abi"], # type: ignore
if supported[0]:
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector,
"abi": selectors[selector]["abi"], # type: ignore
}
else:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"]
basic_selectors = {
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}
for interface_name, selector in basic_selectors.items():
selector_result = contract.functions.supportsInterface(
bytes.fromhex(selector)
).call() # returns bool
if selector_result:
result[interface_name] = {
"selector": basic_selectors[interface_name],
"abi": selectors[basic_selectors[interface_name]]["abi"],
}
else:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"]
basic_selectors = {
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}
for selector_name in basic_selectors:
selector_result = contract.get_function_by_name("supportsInterface").call(
bytes.fromhex(selectors[selector_name])
)
if selector_result:
result[selector_name] = {
"selector": basic_selectors[selector_name],
"abi": selectors[selectors[selector_name]]["abi"],
}
except Exception as err:
logger.error(f"Error while getting list of support interfaces: {err}")
MoonstreamHTTPException(status_code=500, internal_error=err)
return result

Wyświetl plik

@ -21,6 +21,7 @@ from ..actions import (
check_if_smart_contract,
get_entity_subscription_journal_id,
get_list_of_support_interfaces,
get_moonworm_tasks,
validate_abi_json,
)
from ..admin import subscription_types
@ -642,6 +643,57 @@ async def list_subscription_types() -> data.SubscriptionTypesListResponse:
return data.SubscriptionTypesListResponse(subscription_types=results)
@router.get(
"/{subscription_id}/jobs",
tags=["subscriptions"],
response_model=List[BugoutSearchResult],
)
async def get_subscription_jobs_handler(
request: Request,
subscription_id: str = Path(...),
) -> Any:
token = request.state.token
user = request.state.user
try:
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_resource = bc.get_entity(
token=token,
journal_id=journal_id,
entity_id=subscription_id,
)
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Error get subscriptions for user ({user}), error: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
for field in subscription_resource.required_fields:
if "subscription_type_id" in field:
subscription_type_id = field["subscription_type_id"]
subscription_address = subscription_resource.address
get_moonworm_jobs_response = get_moonworm_tasks(
subscription_type_id=subscription_type_id,
address=subscription_address,
user_abi=subscription_resource.secondary_fields.get("abi") or [],
)
return get_moonworm_jobs_response
@router.get(
"/is_contract",
tags=["subscriptions"],