diff --git a/moonworm/generators/basic.py b/moonworm/generators/basic.py index 4ff14ad..7efc09e 100644 --- a/moonworm/generators/basic.py +++ b/moonworm/generators/basic.py @@ -1,3 +1,5 @@ +from collections import defaultdict +import copy import keyword import logging import os @@ -5,6 +7,7 @@ from typing import Any, Dict, List, Sequence, Union, cast import black import black.mode +import inflection import libcst as cst from ..version import MOONWORM_VERSION @@ -34,29 +37,33 @@ def format_code(code: str) -> str: def make_annotation(types: list, optional: bool = False): - if len(types) == 1: - return cst.Annotation(annotation=cst.Name(types[0])) - union_slice = [] - for _type in types: - union_slice.append( - cst.SubscriptElement( - slice=cst.Index( - value=cst.Name(_type), - ) - ), + annotation = cst.Annotation(annotation=cst.Name(types[0])) + if len(types) > 1: + union_slice = [] + for _type in types: + union_slice.append( + cst.SubscriptElement( + slice=cst.Index( + value=cst.Name(_type), + ) + ), + ) + annotation = cst.Annotation( + annotation=cst.Subscript(value=cst.Name("Union"), slice=union_slice) ) - annotation = cst.Annotation( - annotation=cst.Subscript(value=cst.Name("Union"), slice=union_slice) - ) if optional: annotation = cst.Annotation( annotation=cst.Subscript( value=cst.Name("Optional"), - slice=cast(Sequence[cst.SubscriptElement], annotation), + slice=[ + cst.SubscriptElement(slice=cst.Index(value=annotation.annotation)) + ], ) ) + return annotation + def normalize_abi_name(name: str) -> str: if keyword.iskeyword(name): @@ -136,6 +143,104 @@ def generate_contract_class( ) +def function_spec(function_abi: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """ + Accepts function interface definitions from smart contract ABIs. An example input: + { + "inputs": [ + { + "internalType": "uint256", + "name": "_tokenId", + "type": "uint256" + } + ], + "name": "getDNA", + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view", + "type": "function" + } + + Returns an dictionary of the form: + { + "abi": "getDNA", + "method": "get_dna", + "cli": "get-dna", + "inputs": [ + { + "abi": "_tokenId", + "method": "_tokenId", + "cli": "--token-id", + "args": "token_id", + "type": int, + "cli_type": int, + }, + ], + "transact": False, + } + """ + abi_name = function_abi.get("name") + if abi_name is None: + raise ValueError('function_spec -- Valid function ABI must have a "name" field') + + underscored_name = inflection.underscore(abi_name) + function_name = normalize_abi_name(underscored_name) + cli_name = inflection.dasherize(underscored_name) + + default_input_name = "arg" + default_counter = 1 + + inputs: List[Dict[str, Any]] = [] + for item in function_abi.get("inputs", []): + item_abi_name = item.get("name") + if not item_abi_name: + item_abi_name = f"{default_input_name}{default_counter}" + default_counter += 1 + + item_method_name = normalize_abi_name(inflection.underscore(item_abi_name)) + item_args_name = item_method_name + if item_args_name.startswith("_") or item_args_name.endswith("_"): + item_args_name = item_args_name.strip("_") + "_arg" + + item_cli_name = f"--{inflection.dasherize(item_args_name)}" + + item_type = python_type(item["type"])[0] + + item_cli_type = None + if item_type in {"int", "bool"}: + item_cli_type = item_type + + input_spec: Dict[str, Any] = { + "abi": item_abi_name, + "method": item_method_name, + "cli": item_cli_name, + "args": item_args_name, + "type": item_type, + "cli_type": item_cli_type, + } + + inputs.append(input_spec) + + transact = True + if function_abi.get("stateMutability") == "view": + transact = False + + spec = { + "abi": abi_name, + "method": function_name, + "cli": cli_name, + "inputs": inputs, + "transact": transact, + } + + return spec + + def generate_contract_constructor_function( func_object: Dict[str, Any] ) -> cst.FunctionDef: diff --git a/moonworm/generators/brownie.py b/moonworm/generators/brownie.py index b4dd19d..e048d3e 100644 --- a/moonworm/generators/brownie.py +++ b/moonworm/generators/brownie.py @@ -3,9 +3,14 @@ import os from typing import Any, Dict, List, Optional import libcst as cst +from libcst._nodes.statement import SimpleStatementLine from ..version import MOONWORM_VERSION -from .basic import format_code, make_annotation, normalize_abi_name, python_type +from .basic import ( + format_code, + function_spec, + make_annotation, +) BROWNIE_INTERFACE_TEMPLATE_PATH = os.path.join( os.path.dirname(__file__), "brownie_contract.py.template" @@ -29,9 +34,25 @@ def generate_brownie_contract_class( name=cst.Name("__init__"), body=cst.IndentedBlock( body=[ + cst.parse_statement(f'self.contract_name = "{contract_name}"'), cst.parse_statement("self.address = contract_address"), - cst.parse_statement( - f"self.contract = contract_from_build({contract_name})" + cst.parse_statement("self.contract = None"), + cst.parse_statement(f'self.abi = get_abi_json("{contract_name}")'), + cst.If( + test=cst.Comparison( + left=cst.Attribute( + attr=cst.Name(value="address"), + value=cst.Name(value="self"), + ), + comparisons=[ + cst.ComparisonTarget( + operator=cst.IsNot(), comparator=cst.Name(value="None") + ) + ], + ), + body=cst.parse_statement( + "self.contract: Optional[Contract] = Contract.from_abi(self.contract_name, self.address, self.abi)" + ), ), ] ), @@ -57,7 +78,10 @@ def generate_brownie_contract_class( contract_constructor["name"] = "constructor" class_functions = ( [class_constructor] - + [generate_brownie_constructor_function(contract_constructor)] + + [ + generate_brownie_constructor_function(contract_constructor), + generate_assert_contract_is_instantiated(), + ] + [ generate_brownie_contract_function(function) for function in abi @@ -72,37 +96,34 @@ def generate_brownie_contract_class( def generate_brownie_constructor_function( func_object: Dict[str, Any] ) -> cst.FunctionDef: - - default_param_name = "arg" - default_counter = 1 + spec = function_spec(func_object) func_params = [] func_params.append(cst.Param(name=cst.Name("self"))) param_names = [] - for param in func_object["inputs"]: - param_name = normalize_abi_name(param["name"]) - if param_name == "": - param_name = f"{default_param_name}{default_counter}" - default_counter += 1 - param_type = make_annotation(python_type(param["type"])) - param_names.append(param_name) + for param in spec["inputs"]: + param_type = make_annotation([param["type"]]) + param_names.append(param["method"]) func_params.append( cst.Param( - name=cst.Name(value=param_name), + name=cst.Name(value=param["method"]), annotation=param_type, ) ) - func_params.append(cst.Param(name=cst.Name("signer"))) + func_params.append(cst.Param(name=cst.Name("transaction_config"))) func_name = "deploy" - param_names.append("{'from': signer}") - proxy_call_code = ( - f"deployed_contract = self.contract.deploy({','.join(param_names)})" - ) + param_names.append("transaction_config") func_body = cst.IndentedBlock( body=[ - cst.parse_statement(proxy_call_code), - cst.parse_statement("self.address=deployed_contract.address"), + cst.parse_statement( + f"contract_class = contract_from_build(self.contract_name)" + ), + cst.parse_statement( + f"deployed_contract = contract_class.deploy({','.join(param_names)})" + ), + cst.parse_statement("self.address = deployed_contract.address"), + cst.parse_statement("self.contract = deployed_contract"), ] ) @@ -113,20 +134,46 @@ def generate_brownie_constructor_function( ) -def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.FunctionDef: +def generate_assert_contract_is_instantiated() -> cst.FunctionDef: + function_body = cst.IndentedBlock( + body=[ + cst.If( + test=cst.Comparison( + left=cst.Attribute( + attr=cst.Name(value="contract"), value=cst.Name(value="self") + ), + comparisons=[ + cst.ComparisonTarget( + operator=cst.Is(), comparator=cst.Name(value="None") + ) + ], + ), + body=cst.parse_statement( + 'raise Exception("contract has not been instantiated")' + ), + ), + ], + ) + function_def = cst.FunctionDef( + name=cst.Name(value="assert_contract_is_instantiated"), + params=cst.Parameters( + params=[cst.Param(name=cst.Name(value="self"))], + ), + body=function_body, + returns=cst.Annotation(annotation=cst.Name(value="None")), + ) + return function_def - default_param_name = "arg" - default_counter = 1 + +def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.FunctionDef: + spec = function_spec(func_object) func_params = [] func_params.append(cst.Param(name=cst.Name("self"))) param_names = [] - for param in func_object["inputs"]: - param_name = normalize_abi_name(param["name"]) - if param_name == "": - param_name = f"{default_param_name}{default_counter}" - default_counter += 1 - param_type = make_annotation(python_type(param["type"])) + for param in spec["inputs"]: + param_type = make_annotation([param["type"]]) + param_name = param["method"] param_names.append(param_name) func_params.append( cst.Param( @@ -135,19 +182,26 @@ def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.Funct ) ) - func_raw_name = normalize_abi_name(func_object["name"]) - func_name = cst.Name(func_raw_name) - if func_object["stateMutability"] == "view": - proxy_call_code = ( - f"return self.contract.{func_raw_name}.call({','.join(param_names)})" - ) - else: - func_params.append(cst.Param(name=cst.Name(value="signer"))) - param_names.append(f"{{'from': signer}}") + func_raw_name = spec["abi"] + func_python_name = spec["method"] + func_name = cst.Name(value=func_python_name) + if spec["transact"]: + func_params.append(cst.Param(name=cst.Name(value="transaction_config"))) + param_names.append("transaction_config") proxy_call_code = ( f"return self.contract.{func_raw_name}({','.join(param_names)})" ) - func_body = cst.IndentedBlock(body=[cst.parse_statement(proxy_call_code)]) + else: + proxy_call_code = ( + f"return self.contract.{func_raw_name}.call({','.join(param_names)})" + ) + + func_body = cst.IndentedBlock( + body=[ + cst.parse_statement("self.assert_contract_is_instantiated()"), + cst.parse_statement(proxy_call_code), + ] + ) func_returns = cst.Annotation(annotation=cst.Name(value="Any")) return cst.FunctionDef( @@ -161,7 +215,6 @@ def generate_brownie_contract_function(func_object: Dict[str, Any]) -> cst.Funct def generate_get_transaction_config() -> cst.FunctionDef: function_body = cst.IndentedBlock( body=[ - cst.parse_statement("network.connect(args.network)"), cst.parse_statement( "signer = network.accounts.load(args.sender, args.password)" ), @@ -199,6 +252,7 @@ def generate_get_transaction_config() -> cst.FunctionDef: 'transaction_config["required_confs"] = args.confirmations' ), ), + cst.parse_statement("return transaction_config"), ], ) function_def = cst.FunctionDef( @@ -240,12 +294,20 @@ def generate_cli_handler( Returns None if it is not appropriate for the given function to have a handler (e.g. fallback or receive). constructor is handled separately with a deploy handler. """ - function_name = function_abi.get("name") - if function_name is None: - return None + spec = function_spec(function_abi) + function_name = spec["method"] function_body_raw: List[cst.CSTNode] = [] + # Instantiate the contract + function_body_raw.extend( + [ + cst.parse_statement("network.connect(args.network)"), + cst.parse_statement(f"contract = {contract_name}(args.address)"), + ] + ) + + # If a transaction is required, extract transaction parameters from CLI requires_transaction = True if function_abi["stateMutability"] == "view": requires_transaction = False @@ -254,7 +316,43 @@ def generate_cli_handler( function_body_raw.append( cst.parse_statement("transaction_config = get_transaction_config(args)") ) - function_body_raw.append(cst.parse_statement("pass")) + + # Call contract method + call_args: List[cst.Arg] = [] + for param in spec["inputs"]: + call_args.append( + cst.Arg( + keyword=cst.Name(value=param["method"]), + value=cst.Attribute( + attr=cst.Name(value=param["args"]), value=cst.Name(value="args") + ), + ) + ) + if requires_transaction: + call_args.append( + cst.Arg( + keyword=cst.Name(value="transaction_config"), + value=cst.Name(value="transaction_config"), + ) + ) + method_call = cst.Call( + func=cst.Attribute( + attr=cst.Name(value=spec["method"]), + value=cst.Name(value="contract"), + ), + args=call_args, + ) + method_call_result_statement = cst.SimpleStatementLine( + body=[ + cst.Assign( + targets=[cst.AssignTarget(target=cst.Name(value="result"))], + value=method_call, + ) + ] + ) + function_body_raw.append(method_call_result_statement) + + function_body_raw.append(cst.parse_statement("print(result)")) function_body = cst.IndentedBlock(body=function_body_raw) @@ -279,6 +377,175 @@ def generate_cli_handler( return function_def +def generate_add_default_arguments() -> cst.FunctionDef: + function_body = cst.IndentedBlock( + body=[ + cst.parse_statement( + 'parser.add_argument("--network", required=True, help="Name of brownie network to connect to")' + ), + cst.parse_statement( + 'parser.add_argument("--address", required=False, help="Address of deployed contract to connect to")' + ), + # TODO(zomglings): The generated code could be confusing for users. Fix this so that it adds additional arguments as part of the "if" statement + cst.If( + test=cst.UnaryOperation( + operator=cst.Not(), expression=cst.Name(value="transact") + ), + body=cst.parse_statement("return"), + ), + cst.parse_statement( + 'parser.add_argument("--sender", required=True, help="Path to keystore file for transaction sender")' + ), + cst.parse_statement( + 'parser.add_argument("--password", required=False, help="Password to keystore file (if you do not provide it, you will be prompted for it)")' + ), + cst.parse_statement( + 'parser.add_argument("--gas-price", default=None, help="Gas price at which to submit transaction")' + ), + cst.parse_statement( + 'parser.add_argument("--confirmations", type=int, default=None, help="Number of confirmations to await before considering a transaction completed")' + ), + ], + ) + function_def = cst.FunctionDef( + name=cst.Name(value="add_default_arguments"), + params=cst.Parameters( + params=[ + cst.Param( + name=cst.Name(value="parser"), + annotation=cst.Annotation( + annotation=cst.Attribute( + attr=cst.Name(value="ArgumentParser"), + value=cst.Name(value="argparse"), + ) + ), + ), + cst.Param( + name=cst.Name(value="transact"), + annotation=cst.Annotation( + annotation=cst.Name(value="bool"), + ), + ), + ], + ), + body=function_body, + returns=cst.Annotation(annotation=cst.Name(value="None")), + ) + return function_def + + +def generate_cli_generator( + abi: List[Dict[str, Any]], contract_name: Optional[str] = None +) -> cst.FunctionDef: + """ + Generates a generate_cli function that creates a CLI for the generated contract. + """ + if contract_name is None: + contract_name = "generated contract" + statements: List[cst.SimpleStatementLine] = [ + cst.parse_statement( + f'parser = argparse.ArgumentParser(description="CLI for {contract_name}")' + ), + cst.parse_statement("parser.set_defaults(func=lambda _: parser.print_help())"), + cst.parse_statement("subcommands = parser.add_subparsers()"), + ] + for item in abi: + if item["type"] != "function": + continue + spec = function_spec(item) + subparser_statements: List[SimpleStatementLine] = [cst.Newline()] + + subparser_name = f'{spec["method"]}_parser' + + subparser_statements.append( + cst.parse_statement( + f'{subparser_name} = subcommands.add_parser("{spec["cli"]}")' + ) + ) + subparser_statements.append( + cst.parse_statement( + f'add_default_arguments({subparser_name}, {spec["transact"]})' + ) + ) + + for param in spec["inputs"]: + call_args = [ + cst.Arg( + value=cst.SimpleString(value=f'u"{param["cli"]}"'), + ), + cst.Arg( + keyword=cst.Name(value="required"), + value=cst.Name(value="True"), + ), + ] + if param["type"] is not None: + cst.Arg( + keyword=cst.Name(value="type"), + value=cst.Name(param["type"]), + ), + + add_argument_call = cst.Call( + func=cst.Attribute( + attr=cst.Name(value="add_argument"), + value=cst.Name(value=subparser_name), + ), + args=call_args, + ) + add_argument_statement = cst.SimpleStatementLine( + body=[cst.Expr(value=add_argument_call)] + ) + subparser_statements.append(add_argument_statement) + + subparser_statements.append( + cst.parse_statement( + f"{subparser_name}.set_defaults(func=handle_{spec['method']})" + ) + ) + subparser_statements.append(cst.Newline()) + statements.extend(subparser_statements) + + statements.append(cst.parse_statement("return parser")) + + function_body = cst.IndentedBlock(body=statements) + function_def = cst.FunctionDef( + name=cst.Name(value="generate_cli"), + params=cst.Parameters(params=[]), + body=function_body, + returns=cst.Annotation( + annotation=cst.Attribute( + attr=cst.Name(value="ArgumentParser"), value=cst.Name(value="argparse") + ) + ), + ) + return function_def + + +def generate_main() -> cst.FunctionDef: + statements: List[cst.SimpleStatementLine] = [ + cst.parse_statement("parser = generate_cli()"), + cst.parse_statement("args = parser.parse_args()"), + cst.parse_statement("args.func(args)"), + ] + function_body = cst.IndentedBlock(body=statements) + function_def = cst.FunctionDef( + name=cst.Name(value="main"), + params=cst.Parameters(params=[]), + body=function_body, + returns=cst.Annotation(annotation=cst.Name(value="None")), + ) + return function_def + + +def generate_runner() -> cst.If: + module = cst.parse_module( + """ +if __name__ == "__main__": + main() + """ + ) + return module.body[0] + + def generate_brownie_cli( abi: List[Dict[str, Any]], contract_name: str ) -> List[cst.FunctionDef]: @@ -286,7 +553,8 @@ def generate_brownie_cli( Generates an argparse CLI to a brownie smart contract using the generated smart contract interface. """ get_transaction_config_function = generate_get_transaction_config() - handlers = [get_transaction_config_function] + add_default_arguments_function = generate_add_default_arguments() + handlers = [get_transaction_config_function, add_default_arguments_function] handlers.extend( [ generate_cli_handler(function_abi, contract_name) @@ -296,6 +564,9 @@ def generate_brownie_cli( ] ) nodes: List[cst.CSTNode] = [handler for handler in handlers if handler is not None] + nodes.append(generate_cli_generator(abi, contract_name)) + nodes.append(generate_main()) + nodes.append(generate_runner()) return nodes diff --git a/moonworm/generators/brownie_contract.py.template b/moonworm/generators/brownie_contract.py.template index 92eeaae..4586b9b 100644 --- a/moonworm/generators/brownie_contract.py.template +++ b/moonworm/generators/brownie_contract.py.template @@ -2,7 +2,7 @@ # Moonworm version : {moonworm_version} import argparse -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from eth_typing.evm import ChecksumAddress import os import json @@ -13,10 +13,28 @@ from brownie.network.contract import ContractContainer PROJECT_DIRECTORY = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) BUILD_DIRECTORY = os.path.join(PROJECT_DIRECTORY, "build", "contracts") -PROJECT = project.load(PROJECT_DIRECTORY) + + +def get_abi_json(abi_name: str) -> List[Dict[str, Any]]: + abi_full_path = os.path.join(BUILD_DIRECTORY, f"{{abi_name}}.json") + if not os.path.isfile(abi_full_path): + raise IOError( + f"File does not exist: {{abi_full_path}}. Maybe you have to compile the smart contracts?" + ) + + with open(abi_full_path, "r") as ifp: + build = json.load(ifp) + + abi_json = build.get("abi") + if abi_json is None: + raise ValueError(f"Could not find ABI definition in: {{abi_full_path}}") + + return abi_json def contract_from_build(abi_name: str) -> ContractContainer: + PROJECT = project.load(PROJECT_DIRECTORY) + abi_full_path = os.path.join(BUILD_DIRECTORY, f"{{abi_name}}.json") if not os.path.isfile(abi_full_path): raise IOError( diff --git a/moonworm/tests/generators/__init__.py b/moonworm/tests/generators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/moonworm/tests/generators/test_basic.py b/moonworm/tests/generators/test_basic.py new file mode 100644 index 0000000..d2ed0f8 --- /dev/null +++ b/moonworm/tests/generators/test_basic.py @@ -0,0 +1,76 @@ +import unittest + +from moonworm.generators.basic import function_spec + + +class TestFunctionSpec(unittest.TestCase): + def test_function_spec_single_input(self): + function_abi = { + "inputs": [ + {"internalType": "uint256", "name": "_tokenId", "type": "uint256"} + ], + "name": "getDNA", + "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], + "stateMutability": "view", + "type": "function", + } + expected_spec = { + "abi": "getDNA", + "method": "get_dna", + "cli": "get-dna", + "inputs": [ + { + "abi": "_tokenId", + "method": "_token_id", + "cli": "--token-id-arg", + "args": "token_id_arg", + "type": "int", + "cli_type": "int", + }, + ], + "transact": False, + } + spec = function_spec(function_abi) + self.assertDictEqual(spec, expected_spec) + + def test_function_spec_multiple_inputs(self): + function_abi = { + "inputs": [ + {"internalType": "address", "name": "owner", "type": "address"}, + {"internalType": "uint256", "name": "index", "type": "uint256"}, + ], + "name": "tokenOfOwnerByIndex", + "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], + "stateMutability": "view", + "type": "function", + } + expected_spec = { + "abi": "tokenOfOwnerByIndex", + "method": "token_of_owner_by_index", + "cli": "token-of-owner-by-index", + "inputs": [ + { + "abi": "owner", + "method": "owner", + "cli": "--owner", + "args": "owner", + "type": "ChecksumAddress", + "cli_type": None, + }, + { + "abi": "index", + "method": "index", + "cli": "--index", + "args": "index", + "type": "int", + "cli_type": "int", + }, + ], + "transact": False, + } + spec = function_spec(function_abi) + self.assertDictEqual(spec, expected_spec) + + +if __name__ == "__main__": + unittest.main() diff --git a/mypy.ini b/mypy.ini index 25c7845..89a9b68 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,4 +1,4 @@ [mypy] python_version = 3.8 ignore_missing_imports = True -exclude = tests|crawler +exclude = tests|crawler|moonworm.generators.brownie diff --git a/setup.py b/setup.py index 8179572..ad988cb 100644 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ setup( package_data={"moonworm": ["py.typed"]}, install_requires=[ "black", + "inflection", "libcst", "moonstreamdb", "pysha3<2.0.0,>=1.0.0",