pull/876/head
Andrey 2023-08-19 02:55:45 +03:00
rodzic 9a722abcbb
commit 28c660c979
1 zmienionych plików z 78 dodań i 70 usunięć

Wyświetl plik

@ -6,6 +6,7 @@ from collections import OrderedDict
from enum import Enum from enum import Enum
from itertools import chain from itertools import chain
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import traceback
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import ( from bugout.data import (
@ -486,7 +487,7 @@ def get_all_entries_from_search(
limit=limit, limit=limit,
offset=offset, offset=offset,
) )
results.extend(existing_methods.results) # type: ignore results.extend(existing_methods.results) # type: ignore
if len(results) != existing_methods.total_results: if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit): for offset in range(limit, existing_methods.total_results, limit):
@ -499,7 +500,7 @@ def get_all_entries_from_search(
limit=limit, limit=limit,
offset=offset, offset=offset,
) )
results.extend(existing_methods.results) # type: ignore results.extend(existing_methods.results) # type: ignore
return results return results
@ -849,97 +850,104 @@ def get_list_of_support_interfaces(
Returns list of interfaces supported by given address Returns list of interfaces supported by given address
""" """
_, _, is_contract = check_if_smart_contract( try:
blockchain_type=blockchain_type, address=address, user_token=user_token web3_client = connect(blockchain_type, user_token=user_token)
)
if not is_contract: contract = web3_client.eth.contract(
raise AddressNotSmartContractException(f"Address not are smart contract") address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
web3_client = connect(blockchain_type, user_token=user_token)
contract = web3_client.eth.contract(
address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
)
calls = []
list_of_interfaces = list(selectors.keys())
list_of_interfaces.sort()
for interaface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(contract.get_function_by_name("supportsInterface"))
.encode_data([bytes.fromhex(interaface)])
.hex(),
)
) )
result = {}
if blockchain_type in multicall_contracts:
calls = [] calls = []
list_of_interfaces = list(selectors.keys()) list_of_interfaces = list(selectors.keys())
list_of_interfaces.sort() list_of_interfaces.sort()
for interface in list_of_interfaces: for interaface in list_of_interfaces:
calls.append( calls.append(
( (
contract.address, contract.address,
FunctionSignature( FunctionSignature(
contract.get_function_by_name("supportsInterface") contract.get_function_by_name("supportsInterface")
) )
.encode_data([bytes.fromhex(interface)]) .encode_data([bytes.fromhex(interaface)])
.hex(), .hex(),
) )
) )
try: result = {}
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")
for i, selector in enumerate(list_of_interfaces): if blockchain_type in multicall_contracts:
if multicall_result[i][0]: calls = []
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])
if supported[0]: list_of_interfaces = list(selectors.keys())
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector, list_of_interfaces.sort()
"abi": selectors[selector]["abi"], # type: ignore
for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
)
.encode_data([bytes.fromhex(interface)])
.hex(),
)
)
try:
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")
for i, selector in enumerate(list_of_interfaces):
if multicall_result[i][0]:
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])
if supported[0]:
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector,
"abi": selectors[selector]["abi"], # type: ignore
}
else:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"]
basic_selectors = {
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}
for interface_name, selector in basic_selectors.items():
selector_result = contract.get_function_by_name(
"supportsInterface"
).call(bytes.fromhex(selector))
if selector_result:
result[interface_name] = {
"selector": basic_selectors[interface_name],
"abi": selectors[selectors[interface_name]]["abi"],
} }
except Exception as err:
traceback.print_exc()
logger.error(f"Error while getting list of support interfaces: {err}")
_, _, is_contract = check_if_smart_contract(
blockchain_type=blockchain_type, address=address, user_token=user_token
)
else: if not is_contract:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"] raise AddressNotSmartContractException(f"Address not are smart contract")
else:
basic_selectors = { raise err
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}
for interface_name, selector in basic_selectors.items():
selector_result = contract.get_function_by_name("supportsInterface").call(
bytes.fromhex(selector)
)
if selector_result:
result[interface_name] = {
"selector": basic_selectors[interface_name],
"abi": selectors[selectors[interface_name]]["abi"],
}
return result return result