Merge branch 'main' into fix-list-type

pull/25/head
yhtiyar 2021-12-07 02:11:19 +03:00
commit 2901cc7af0
7 zmienionych plików z 535 dodań i 64 usunięć

Wyświetl plik

@ -1,3 +1,5 @@
from collections import defaultdict
import copy
import keyword
import logging
import os
@ -5,6 +7,7 @@ from typing import Any, Dict, List, Sequence, Union, cast
import black
import black.mode
import inflection
import libcst as cst
from ..version import MOONWORM_VERSION
@ -34,29 +37,33 @@ def format_code(code: str) -> str:
def make_annotation(types: list, optional: bool = False):
if len(types) == 1:
return cst.Annotation(annotation=cst.Name(types[0]))
union_slice = []
for _type in types:
union_slice.append(
cst.SubscriptElement(
slice=cst.Index(
value=cst.Name(_type),
)
),
annotation = cst.Annotation(annotation=cst.Name(types[0]))
if len(types) > 1:
union_slice = []
for _type in types:
union_slice.append(
cst.SubscriptElement(
slice=cst.Index(
value=cst.Name(_type),
)
),
)
annotation = cst.Annotation(
annotation=cst.Subscript(value=cst.Name("Union"), slice=union_slice)
)
annotation = cst.Annotation(
annotation=cst.Subscript(value=cst.Name("Union"), slice=union_slice)
)
if optional:
annotation = cst.Annotation(
annotation=cst.Subscript(
value=cst.Name("Optional"),
slice=cast(Sequence[cst.SubscriptElement], annotation),
slice=[
cst.SubscriptElement(slice=cst.Index(value=annotation.annotation))
],
)
)
return annotation
def normalize_abi_name(name: str) -> str:
if keyword.iskeyword(name):
@ -136,6 +143,104 @@ def generate_contract_class(
)
def function_spec(function_abi: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
Accepts function interface definitions from smart contract ABIs. An example input:
{
"inputs": [
{
"internalType": "uint256",
"name": "_tokenId",
"type": "uint256"
}
],
"name": "getDNA",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
}
Returns an dictionary of the form:
{
"abi": "getDNA",
"method": "get_dna",
"cli": "get-dna",
"inputs": [
{
"abi": "_tokenId",
"method": "_tokenId",
"cli": "--token-id",
"args": "token_id",
"type": int,
"cli_type": int,
},
],
"transact": False,
}
"""
abi_name = function_abi.get("name")
if abi_name is None:
raise ValueError('function_spec -- Valid function ABI must have a "name" field')
underscored_name = inflection.underscore(abi_name)
function_name = normalize_abi_name(underscored_name)
cli_name = inflection.dasherize(underscored_name)
default_input_name = "arg"
default_counter = 1
inputs: List[Dict[str, Any]] = []
for item in function_abi.get("inputs", []):
item_abi_name = item.get("name")
if not item_abi_name:
item_abi_name = f"{default_input_name}{default_counter}"
default_counter += 1
item_method_name = normalize_abi_name(inflection.underscore(item_abi_name))
item_args_name = item_method_name
if item_args_name.startswith("_") or item_args_name.endswith("_"):
item_args_name = item_args_name.strip("_") + "_arg"
item_cli_name = f"--{inflection.dasherize(item_args_name)}"
item_type = python_type(item["type"])[0]
item_cli_type = None
if item_type in {"int", "bool"}:
item_cli_type = item_type
input_spec: Dict[str, Any] = {
"abi": item_abi_name,
"method": item_method_name,
"cli": item_cli_name,
"args": item_args_name,
"type": item_type,
"cli_type": item_cli_type,
}
inputs.append(input_spec)
transact = True
if function_abi.get("stateMutability") == "view":
transact = False
spec = {
"abi": abi_name,
"method": function_name,
"cli": cli_name,
"inputs": inputs,
"transact": transact,
}
return spec
def generate_contract_constructor_function(
func_object: Dict[str, Any]
) -> cst.FunctionDef:

Wyświetl plik

@ -3,9 +3,14 @@ import os
from typing import Any, Dict, List, Optional
import libcst as cst
from libcst._nodes.statement import SimpleStatementLine
from ..version import MOONWORM_VERSION
from .basic import format_code, make_annotation, normalize_abi_name, python_type
from .basic import (
format_code,
function_spec,
make_annotation,
)
BROWNIE_INTERFACE_TEMPLATE_PATH = os.path.join(
os.path.dirname(__file__), "brownie_contract.py.template"
@ -29,9 +34,25 @@ def generate_brownie_contract_class(
name=cst.Name("__init__"),
body=cst.IndentedBlock(
body=[
cst.parse_statement(f'self.contract_name = "{contract_name}"'),
cst.parse_statement("self.address = contract_address"),
cst.parse_statement(
f"self.contract = contract_from_build({contract_name})"
cst.parse_statement("self.contract = None"),
cst.parse_statement(f'self.abi = get_abi_json("{contract_name}")'),
cst.If(
test=cst.Comparison(
left=cst.Attribute(
attr=cst.Name(value="address"),
value=cst.Name(value="self"),
),
comparisons=[
cst.ComparisonTarget(
operator=cst.IsNot(), comparator=cst.Name(value="None")
)
],
),
body=cst.parse_statement(
"self.contract: Optional[Contract] = Contract.from_abi(self.contract_name, self.address, self.abi)"
),
),
]
),
@ -57,7 +78,10 @@ def generate_brownie_contract_class(
contract_constructor["name"] = "constructor"
class_functions = (
[class_constructor]
+ [generate_brownie_constructor_function(contract_constructor)]
+ [
generate_brownie_constructor_function(contract_constructor),
generate_assert_contract_is_instantiated(),
]
+ [
generate_brownie_contract_function(function)
for function in abi
@ -72,37 +96,34 @@ def generate_brownie_contract_class(
def generate_brownie_constructor_function(
func_object: Dict[str, Any]
) -> cst.FunctionDef:
default_param_name = "arg"
default_counter = 1
spec = function_spec(func_object)
func_params = []
func_params.append(cst.Param(name=cst.Name("self")))
param_names = []
for param in func_object["inputs"]:
param_name = normalize_abi_name(param["name"])
if param_name == "":
param_name = f"{default_param_name}{default_counter}"
default_counter += 1
param_type = make_annotation(python_type(param["type"]))
param_names.append(param_name)
for param in spec["inputs"]:
param_type = make_annotation([param["type"]])
param_names.append(param["method"])
func_params.append(
cst.Param(
name=cst.Name(value=param_name),
name=cst.Name(value=param["method"]),
annotation=param_type,
)
)
func_params.append(cst.Param(name=cst.Name("signer")))
func_params.append(cst.Param(name=cst.Name("transaction_config")))
func_name = "deploy"
param_names.append("{'from': signer}")
proxy_call_code = (
f"deployed_contract = self.contract.deploy({','.join(param_names)})"
)
param_names.append("transaction_config")
func_body = cst.IndentedBlock(
body=[
cst.parse_statement(proxy_call_code),
cst.parse_statement("self.address=deployed_contract.address"),
cst.parse_statement(
f"contract_class = contract_from_build(self.contract_name)"
),
cst.parse_statement(
f"deployed_contract = contract_class.deploy({','.join(param_names)})"
),
cst.parse_statement("self.address = deployed_contract.address"),
cst.parse_statement("self.contract = deployed_contract"),
]
)
@ -113,20 +134,46 @@ def generate_brownie_constructor_function(
)
def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.FunctionDef:
def generate_assert_contract_is_instantiated() -> cst.FunctionDef:
function_body = cst.IndentedBlock(
body=[
cst.If(
test=cst.Comparison(
left=cst.Attribute(
attr=cst.Name(value="contract"), value=cst.Name(value="self")
),
comparisons=[
cst.ComparisonTarget(
operator=cst.Is(), comparator=cst.Name(value="None")
)
],
),
body=cst.parse_statement(
'raise Exception("contract has not been instantiated")'
),
),
],
)
function_def = cst.FunctionDef(
name=cst.Name(value="assert_contract_is_instantiated"),
params=cst.Parameters(
params=[cst.Param(name=cst.Name(value="self"))],
),
body=function_body,
returns=cst.Annotation(annotation=cst.Name(value="None")),
)
return function_def
default_param_name = "arg"
default_counter = 1
def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.FunctionDef:
spec = function_spec(func_object)
func_params = []
func_params.append(cst.Param(name=cst.Name("self")))
param_names = []
for param in func_object["inputs"]:
param_name = normalize_abi_name(param["name"])
if param_name == "":
param_name = f"{default_param_name}{default_counter}"
default_counter += 1
param_type = make_annotation(python_type(param["type"]))
for param in spec["inputs"]:
param_type = make_annotation([param["type"]])
param_name = param["method"]
param_names.append(param_name)
func_params.append(
cst.Param(
@ -135,19 +182,26 @@ def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.Funct
)
)
func_raw_name = normalize_abi_name(func_object["name"])
func_name = cst.Name(func_raw_name)
if func_object["stateMutability"] == "view":
proxy_call_code = (
f"return self.contract.{func_raw_name}.call({','.join(param_names)})"
)
else:
func_params.append(cst.Param(name=cst.Name(value="signer")))
param_names.append(f"{{'from': signer}}")
func_raw_name = spec["abi"]
func_python_name = spec["method"]
func_name = cst.Name(value=func_python_name)
if spec["transact"]:
func_params.append(cst.Param(name=cst.Name(value="transaction_config")))
param_names.append("transaction_config")
proxy_call_code = (
f"return self.contract.{func_raw_name}({','.join(param_names)})"
)
func_body = cst.IndentedBlock(body=[cst.parse_statement(proxy_call_code)])
else:
proxy_call_code = (
f"return self.contract.{func_raw_name}.call({','.join(param_names)})"
)
func_body = cst.IndentedBlock(
body=[
cst.parse_statement("self.assert_contract_is_instantiated()"),
cst.parse_statement(proxy_call_code),
]
)
func_returns = cst.Annotation(annotation=cst.Name(value="Any"))
return cst.FunctionDef(
@ -161,7 +215,6 @@ def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.Funct
def generate_get_transaction_config() -> cst.FunctionDef:
function_body = cst.IndentedBlock(
body=[
cst.parse_statement("network.connect(args.network)"),
cst.parse_statement(
"signer = network.accounts.load(args.sender, args.password)"
),
@ -199,6 +252,7 @@ def generate_get_transaction_config() -> cst.FunctionDef:
'transaction_config["required_confs"] = args.confirmations'
),
),
cst.parse_statement("return transaction_config"),
],
)
function_def = cst.FunctionDef(
@ -240,12 +294,20 @@ def generate_cli_handler(
Returns None if it is not appropriate for the given function to have a handler (e.g. fallback or
receive). constructor is handled separately with a deploy handler.
"""
function_name = function_abi.get("name")
if function_name is None:
return None
spec = function_spec(function_abi)
function_name = spec["method"]
function_body_raw: List[cst.CSTNode] = []
# Instantiate the contract
function_body_raw.extend(
[
cst.parse_statement("network.connect(args.network)"),
cst.parse_statement(f"contract = {contract_name}(args.address)"),
]
)
# If a transaction is required, extract transaction parameters from CLI
requires_transaction = True
if function_abi["stateMutability"] == "view":
requires_transaction = False
@ -254,7 +316,43 @@ def generate_cli_handler(
function_body_raw.append(
cst.parse_statement("transaction_config = get_transaction_config(args)")
)
function_body_raw.append(cst.parse_statement("pass"))
# Call contract method
call_args: List[cst.Arg] = []
for param in spec["inputs"]:
call_args.append(
cst.Arg(
keyword=cst.Name(value=param["method"]),
value=cst.Attribute(
attr=cst.Name(value=param["args"]), value=cst.Name(value="args")
),
)
)
if requires_transaction:
call_args.append(
cst.Arg(
keyword=cst.Name(value="transaction_config"),
value=cst.Name(value="transaction_config"),
)
)
method_call = cst.Call(
func=cst.Attribute(
attr=cst.Name(value=spec["method"]),
value=cst.Name(value="contract"),
),
args=call_args,
)
method_call_result_statement = cst.SimpleStatementLine(
body=[
cst.Assign(
targets=[cst.AssignTarget(target=cst.Name(value="result"))],
value=method_call,
)
]
)
function_body_raw.append(method_call_result_statement)
function_body_raw.append(cst.parse_statement("print(result)"))
function_body = cst.IndentedBlock(body=function_body_raw)
@ -279,6 +377,175 @@ def generate_cli_handler(
return function_def
def generate_add_default_arguments() -> cst.FunctionDef:
function_body = cst.IndentedBlock(
body=[
cst.parse_statement(
'parser.add_argument("--network", required=True, help="Name of brownie network to connect to")'
),
cst.parse_statement(
'parser.add_argument("--address", required=False, help="Address of deployed contract to connect to")'
),
# TODO(zomglings): The generated code could be confusing for users. Fix this so that it adds additional arguments as part of the "if" statement
cst.If(
test=cst.UnaryOperation(
operator=cst.Not(), expression=cst.Name(value="transact")
),
body=cst.parse_statement("return"),
),
cst.parse_statement(
'parser.add_argument("--sender", required=True, help="Path to keystore file for transaction sender")'
),
cst.parse_statement(
'parser.add_argument("--password", required=False, help="Password to keystore file (if you do not provide it, you will be prompted for it)")'
),
cst.parse_statement(
'parser.add_argument("--gas-price", default=None, help="Gas price at which to submit transaction")'
),
cst.parse_statement(
'parser.add_argument("--confirmations", type=int, default=None, help="Number of confirmations to await before considering a transaction completed")'
),
],
)
function_def = cst.FunctionDef(
name=cst.Name(value="add_default_arguments"),
params=cst.Parameters(
params=[
cst.Param(
name=cst.Name(value="parser"),
annotation=cst.Annotation(
annotation=cst.Attribute(
attr=cst.Name(value="ArgumentParser"),
value=cst.Name(value="argparse"),
)
),
),
cst.Param(
name=cst.Name(value="transact"),
annotation=cst.Annotation(
annotation=cst.Name(value="bool"),
),
),
],
),
body=function_body,
returns=cst.Annotation(annotation=cst.Name(value="None")),
)
return function_def
def generate_cli_generator(
abi: List[Dict[str, Any]], contract_name: Optional[str] = None
) -> cst.FunctionDef:
"""
Generates a generate_cli function that creates a CLI for the generated contract.
"""
if contract_name is None:
contract_name = "generated contract"
statements: List[cst.SimpleStatementLine] = [
cst.parse_statement(
f'parser = argparse.ArgumentParser(description="CLI for {contract_name}")'
),
cst.parse_statement("parser.set_defaults(func=lambda _: parser.print_help())"),
cst.parse_statement("subcommands = parser.add_subparsers()"),
]
for item in abi:
if item["type"] != "function":
continue
spec = function_spec(item)
subparser_statements: List[SimpleStatementLine] = [cst.Newline()]
subparser_name = f'{spec["method"]}_parser'
subparser_statements.append(
cst.parse_statement(
f'{subparser_name} = subcommands.add_parser("{spec["cli"]}")'
)
)
subparser_statements.append(
cst.parse_statement(
f'add_default_arguments({subparser_name}, {spec["transact"]})'
)
)
for param in spec["inputs"]:
call_args = [
cst.Arg(
value=cst.SimpleString(value=f'u"{param["cli"]}"'),
),
cst.Arg(
keyword=cst.Name(value="required"),
value=cst.Name(value="True"),
),
]
if param["type"] is not None:
cst.Arg(
keyword=cst.Name(value="type"),
value=cst.Name(param["type"]),
),
add_argument_call = cst.Call(
func=cst.Attribute(
attr=cst.Name(value="add_argument"),
value=cst.Name(value=subparser_name),
),
args=call_args,
)
add_argument_statement = cst.SimpleStatementLine(
body=[cst.Expr(value=add_argument_call)]
)
subparser_statements.append(add_argument_statement)
subparser_statements.append(
cst.parse_statement(
f"{subparser_name}.set_defaults(func=handle_{spec['method']})"
)
)
subparser_statements.append(cst.Newline())
statements.extend(subparser_statements)
statements.append(cst.parse_statement("return parser"))
function_body = cst.IndentedBlock(body=statements)
function_def = cst.FunctionDef(
name=cst.Name(value="generate_cli"),
params=cst.Parameters(params=[]),
body=function_body,
returns=cst.Annotation(
annotation=cst.Attribute(
attr=cst.Name(value="ArgumentParser"), value=cst.Name(value="argparse")
)
),
)
return function_def
def generate_main() -> cst.FunctionDef:
statements: List[cst.SimpleStatementLine] = [
cst.parse_statement("parser = generate_cli()"),
cst.parse_statement("args = parser.parse_args()"),
cst.parse_statement("args.func(args)"),
]
function_body = cst.IndentedBlock(body=statements)
function_def = cst.FunctionDef(
name=cst.Name(value="main"),
params=cst.Parameters(params=[]),
body=function_body,
returns=cst.Annotation(annotation=cst.Name(value="None")),
)
return function_def
def generate_runner() -> cst.If:
module = cst.parse_module(
"""
if __name__ == "__main__":
main()
"""
)
return module.body[0]
def generate_brownie_cli(
abi: List[Dict[str, Any]], contract_name: str
) -> List[cst.FunctionDef]:
@ -286,7 +553,8 @@ def generate_brownie_cli(
Generates an argparse CLI to a brownie smart contract using the generated smart contract interface.
"""
get_transaction_config_function = generate_get_transaction_config()
handlers = [get_transaction_config_function]
add_default_arguments_function = generate_add_default_arguments()
handlers = [get_transaction_config_function, add_default_arguments_function]
handlers.extend(
[
generate_cli_handler(function_abi, contract_name)
@ -296,6 +564,9 @@ def generate_brownie_cli(
]
)
nodes: List[cst.CSTNode] = [handler for handler in handlers if handler is not None]
nodes.append(generate_cli_generator(abi, contract_name))
nodes.append(generate_main())
nodes.append(generate_runner())
return nodes

Wyświetl plik

@ -2,7 +2,7 @@
# Moonworm version : {moonworm_version}
import argparse
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from eth_typing.evm import ChecksumAddress
import os
import json
@ -13,10 +13,28 @@ from brownie.network.contract import ContractContainer
PROJECT_DIRECTORY = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
BUILD_DIRECTORY = os.path.join(PROJECT_DIRECTORY, "build", "contracts")
PROJECT = project.load(PROJECT_DIRECTORY)
def get_abi_json(abi_name: str) -> List[Dict[str, Any]]:
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{{abi_name}}.json")
if not os.path.isfile(abi_full_path):
raise IOError(
f"File does not exist: {{abi_full_path}}. Maybe you have to compile the smart contracts?"
)
with open(abi_full_path, "r") as ifp:
build = json.load(ifp)
abi_json = build.get("abi")
if abi_json is None:
raise ValueError(f"Could not find ABI definition in: {{abi_full_path}}")
return abi_json
def contract_from_build(abi_name: str) -> ContractContainer:
PROJECT = project.load(PROJECT_DIRECTORY)
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{{abi_name}}.json")
if not os.path.isfile(abi_full_path):
raise IOError(

Wyświetl plik

@ -0,0 +1,76 @@
import unittest
from moonworm.generators.basic import function_spec
class TestFunctionSpec(unittest.TestCase):
def test_function_spec_single_input(self):
function_abi = {
"inputs": [
{"internalType": "uint256", "name": "_tokenId", "type": "uint256"}
],
"name": "getDNA",
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
}
expected_spec = {
"abi": "getDNA",
"method": "get_dna",
"cli": "get-dna",
"inputs": [
{
"abi": "_tokenId",
"method": "_token_id",
"cli": "--token-id-arg",
"args": "token_id_arg",
"type": "int",
"cli_type": "int",
},
],
"transact": False,
}
spec = function_spec(function_abi)
self.assertDictEqual(spec, expected_spec)
def test_function_spec_multiple_inputs(self):
function_abi = {
"inputs": [
{"internalType": "address", "name": "owner", "type": "address"},
{"internalType": "uint256", "name": "index", "type": "uint256"},
],
"name": "tokenOfOwnerByIndex",
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
}
expected_spec = {
"abi": "tokenOfOwnerByIndex",
"method": "token_of_owner_by_index",
"cli": "token-of-owner-by-index",
"inputs": [
{
"abi": "owner",
"method": "owner",
"cli": "--owner",
"args": "owner",
"type": "ChecksumAddress",
"cli_type": None,
},
{
"abi": "index",
"method": "index",
"cli": "--index",
"args": "index",
"type": "int",
"cli_type": "int",
},
],
"transact": False,
}
spec = function_spec(function_abi)
self.assertDictEqual(spec, expected_spec)
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -1,4 +1,4 @@
[mypy]
python_version = 3.8
ignore_missing_imports = True
exclude = tests|crawler
exclude = tests|crawler|moonworm.generators.brownie

Wyświetl plik

@ -13,6 +13,7 @@ setup(
package_data={"moonworm": ["py.typed"]},
install_requires=[
"black",
"inflection",
"libcst",
"moonstreamdb",
"pysha3<2.0.0,>=1.0.0",