Terminus functionality in "dao" cli

pull/9/head
Neeraj Kashyap 2021-12-18 12:21:38 -08:00
rodzic 7b3bd5d255
commit dd986d36b3
4 zmienionych plików z 705 dodań i 1 usunięć

Wyświetl plik

@ -0,0 +1,509 @@
# Code generated by moonworm : https://github.com/bugout-dev/moonworm
# Moonworm version : 0.1.8
import argparse
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from brownie import Contract, network, project
from brownie.network.contract import ContractContainer
from eth_typing.evm import ChecksumAddress
PROJECT_DIRECTORY = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
BUILD_DIRECTORY = os.path.join(PROJECT_DIRECTORY, "build", "contracts")
def boolean_argument_type(raw_value: str) -> bool:
TRUE_VALUES = ["1", "t", "y", "true", "yes"]
FALSE_VALUES = ["0", "f", "n", "false", "no"]
if raw_value.lower() in TRUE_VALUES:
return True
elif raw_value.lower() in FALSE_VALUES:
return False
raise ValueError(
f"Invalid boolean argument: {raw_value}. Value must be one of: {','.join(TRUE_VALUES + FALSE_VALUES)}"
)
def bytes_argument_type(raw_value: str) -> bytes:
return raw_value.encode()
def get_abi_json(abi_name: str) -> List[Dict[str, Any]]:
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{abi_name}.json")
if not os.path.isfile(abi_full_path):
raise IOError(
f"File does not exist: {abi_full_path}. Maybe you have to compile the smart contracts?"
)
with open(abi_full_path, "r") as ifp:
build = json.load(ifp)
abi_json = build.get("abi")
if abi_json is None:
raise ValueError(f"Could not find ABI definition in: {abi_full_path}")
return abi_json
def contract_from_build(abi_name: str) -> ContractContainer:
# This is workaround because brownie currently doesn't support loading the same project multiple
# times. This causes problems when using multiple contracts from the same project in the same
# python project.
PROJECT = project.main.Project("moonworm", Path(PROJECT_DIRECTORY))
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{abi_name}.json")
if not os.path.isfile(abi_full_path):
raise IOError(
f"File does not exist: {abi_full_path}. Maybe you have to compile the smart contracts?"
)
with open(abi_full_path, "r") as ifp:
build = json.load(ifp)
return ContractContainer(PROJECT, build)
class TerminusFacet:
def __init__(self, contract_address: Optional[ChecksumAddress]):
self.contract_name = "TerminusFacet"
self.address = contract_address
self.contract = None
self.abi = get_abi_json("TerminusFacet")
if self.address is not None:
self.contract: Optional[Contract] = Contract.from_abi(
self.contract_name, self.address, self.abi
)
def deploy(self, transaction_config):
contract_class = contract_from_build(self.contract_name)
deployed_contract = contract_class.deploy(transaction_config)
self.address = deployed_contract.address
self.contract = deployed_contract
def assert_contract_is_instantiated(self) -> None:
if self.contract is None:
raise Exception("contract has not been instantiated")
def balance_of(self, account: ChecksumAddress, id: int) -> Any:
self.assert_contract_is_instantiated()
return self.contract.balanceOf.call(account, id)
def balance_of_batch(self, accounts: List, ids: List) -> Any:
self.assert_contract_is_instantiated()
return self.contract.balanceOfBatch.call(accounts, ids)
def create_pool(self, transaction_config) -> Any:
self.assert_contract_is_instantiated()
return self.contract.createPool(transaction_config)
def is_approved_for_all(
self, account: ChecksumAddress, operator: ChecksumAddress
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.isApprovedForAll.call(account, operator)
def mint(
self,
to: ChecksumAddress,
pool_id: int,
amount: int,
data: bytes,
transaction_config,
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.mint(to, pool_id, amount, data, transaction_config)
def mint_batch(
self,
to: ChecksumAddress,
pool_i_ds: List,
amounts: List,
data: bytes,
transaction_config,
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.mintBatch(to, pool_i_ds, amounts, data, transaction_config)
def safe_batch_transfer_from(
self,
from_: ChecksumAddress,
to: ChecksumAddress,
ids: List,
amounts: List,
data: bytes,
transaction_config,
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.safeBatchTransferFrom(
from_, to, ids, amounts, data, transaction_config
)
def safe_transfer_from(
self,
from_: ChecksumAddress,
to: ChecksumAddress,
id: int,
amount: int,
data: bytes,
transaction_config,
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.safeTransferFrom(
from_, to, id, amount, data, transaction_config
)
def set_approval_for_all(
self, operator: ChecksumAddress, approved: bool, transaction_config
) -> Any:
self.assert_contract_is_instantiated()
return self.contract.setApprovalForAll(operator, approved, transaction_config)
def set_uri(self, pool_id: int, pool_uri: str, transaction_config) -> Any:
self.assert_contract_is_instantiated()
return self.contract.setURI(pool_id, pool_uri, transaction_config)
def supports_interface(self, interface_id: bytes) -> Any:
self.assert_contract_is_instantiated()
return self.contract.supportsInterface.call(interface_id)
def total_pools(self) -> Any:
self.assert_contract_is_instantiated()
return self.contract.totalPools.call()
def uri(self, pool_id: int) -> Any:
self.assert_contract_is_instantiated()
return self.contract.uri.call(pool_id)
def get_transaction_config(args: argparse.Namespace) -> Dict[str, Any]:
signer = network.accounts.load(args.sender, args.password)
transaction_config: Dict[str, Any] = {"from": signer}
if args.gas_price is not None:
transaction_config["gas_price"] = args.gas_price
if args.confirmations is not None:
transaction_config["required_confs"] = args.confirmations
return transaction_config
def add_default_arguments(parser: argparse.ArgumentParser, transact: bool) -> None:
parser.add_argument(
"--network", required=True, help="Name of brownie network to connect to"
)
parser.add_argument(
"--address", required=False, help="Address of deployed contract to connect to"
)
if not transact:
return
parser.add_argument(
"--sender", required=True, help="Path to keystore file for transaction sender"
)
parser.add_argument(
"--password",
required=False,
help="Password to keystore file (if you do not provide it, you will be prompted for it)",
)
parser.add_argument(
"--gas-price", default=None, help="Gas price at which to submit transaction"
)
parser.add_argument(
"--confirmations",
type=int,
default=None,
help="Number of confirmations to await before considering a transaction completed",
)
def handle_deploy(args: argparse.Namespace) -> None:
network.connect(args.network)
transaction_config = get_transaction_config(args)
contract = TerminusFacet(None)
result = contract.deploy(transaction_config=transaction_config)
print(result)
def handle_balance_of(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.balance_of(account=args.account, id=args.id)
print(result)
def handle_balance_of_batch(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.balance_of_batch(accounts=args.accounts, ids=args.ids)
print(result)
def handle_create_pool(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.create_pool(transaction_config=transaction_config)
print(result)
def handle_is_approved_for_all(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.is_approved_for_all(account=args.account, operator=args.operator)
print(result)
def handle_mint(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.mint(
to=args.to,
pool_id=args.pool_id,
amount=args.amount,
data=args.data,
transaction_config=transaction_config,
)
print(result)
def handle_mint_batch(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.mint_batch(
to=args.to,
pool_i_ds=args.pool_i_ds,
amounts=args.amounts,
data=args.data,
transaction_config=transaction_config,
)
print(result)
def handle_safe_batch_transfer_from(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.safe_batch_transfer_from(
from_=args.from_arg,
to=args.to,
ids=args.ids,
amounts=args.amounts,
data=args.data,
transaction_config=transaction_config,
)
print(result)
def handle_safe_transfer_from(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.safe_transfer_from(
from_=args.from_arg,
to=args.to,
id=args.id,
amount=args.amount,
data=args.data,
transaction_config=transaction_config,
)
print(result)
def handle_set_approval_for_all(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.set_approval_for_all(
operator=args.operator,
approved=args.approved,
transaction_config=transaction_config,
)
print(result)
def handle_set_uri(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
transaction_config = get_transaction_config(args)
result = contract.set_uri(
pool_id=args.pool_id,
pool_uri=args.pool_uri,
transaction_config=transaction_config,
)
print(result)
def handle_supports_interface(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.supports_interface(interface_id=args.interface_id)
print(result)
def handle_total_pools(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.total_pools()
print(result)
def handle_uri(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusFacet(args.address)
result = contract.uri(pool_id=args.pool_id)
print(result)
def generate_cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="CLI for TerminusFacet")
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers()
deploy_parser = subcommands.add_parser("deploy")
add_default_arguments(deploy_parser, True)
deploy_parser.set_defaults(func=handle_deploy)
balance_of_parser = subcommands.add_parser("balance-of")
add_default_arguments(balance_of_parser, False)
balance_of_parser.add_argument("--account", required=True, help="Type: address")
balance_of_parser.add_argument(
"--id", required=True, help="Type: uint256", type=int
)
balance_of_parser.set_defaults(func=handle_balance_of)
balance_of_batch_parser = subcommands.add_parser("balance-of-batch")
add_default_arguments(balance_of_batch_parser, False)
balance_of_batch_parser.add_argument(
"--accounts", required=True, help="Type: address[]", nargs="+"
)
balance_of_batch_parser.add_argument(
"--ids", required=True, help="Type: uint256[]", nargs="+"
)
balance_of_batch_parser.set_defaults(func=handle_balance_of_batch)
create_pool_parser = subcommands.add_parser("create-pool")
add_default_arguments(create_pool_parser, True)
create_pool_parser.set_defaults(func=handle_create_pool)
is_approved_for_all_parser = subcommands.add_parser("is-approved-for-all")
add_default_arguments(is_approved_for_all_parser, False)
is_approved_for_all_parser.add_argument(
"--account", required=True, help="Type: address"
)
is_approved_for_all_parser.add_argument(
"--operator", required=True, help="Type: address"
)
is_approved_for_all_parser.set_defaults(func=handle_is_approved_for_all)
mint_parser = subcommands.add_parser("mint")
add_default_arguments(mint_parser, True)
mint_parser.add_argument("--to", required=True, help="Type: address")
mint_parser.add_argument("--pool-id", required=True, help="Type: uint256", type=int)
mint_parser.add_argument("--amount", required=True, help="Type: uint256", type=int)
mint_parser.add_argument(
"--data", required=True, help="Type: bytes", type=bytes_argument_type
)
mint_parser.set_defaults(func=handle_mint)
mint_batch_parser = subcommands.add_parser("mint-batch")
add_default_arguments(mint_batch_parser, True)
mint_batch_parser.add_argument("--to", required=True, help="Type: address")
mint_batch_parser.add_argument(
"--pool-i-ds", required=True, help="Type: uint256[]", nargs="+"
)
mint_batch_parser.add_argument(
"--amounts", required=True, help="Type: uint256[]", nargs="+"
)
mint_batch_parser.add_argument(
"--data", required=True, help="Type: bytes", type=bytes_argument_type
)
mint_batch_parser.set_defaults(func=handle_mint_batch)
safe_batch_transfer_from_parser = subcommands.add_parser("safe-batch-transfer-from")
add_default_arguments(safe_batch_transfer_from_parser, True)
safe_batch_transfer_from_parser.add_argument(
"--from-arg", required=True, help="Type: address"
)
safe_batch_transfer_from_parser.add_argument(
"--to", required=True, help="Type: address"
)
safe_batch_transfer_from_parser.add_argument(
"--ids", required=True, help="Type: uint256[]", nargs="+"
)
safe_batch_transfer_from_parser.add_argument(
"--amounts", required=True, help="Type: uint256[]", nargs="+"
)
safe_batch_transfer_from_parser.add_argument(
"--data", required=True, help="Type: bytes", type=bytes_argument_type
)
safe_batch_transfer_from_parser.set_defaults(func=handle_safe_batch_transfer_from)
safe_transfer_from_parser = subcommands.add_parser("safe-transfer-from")
add_default_arguments(safe_transfer_from_parser, True)
safe_transfer_from_parser.add_argument(
"--from-arg", required=True, help="Type: address"
)
safe_transfer_from_parser.add_argument("--to", required=True, help="Type: address")
safe_transfer_from_parser.add_argument(
"--id", required=True, help="Type: uint256", type=int
)
safe_transfer_from_parser.add_argument(
"--amount", required=True, help="Type: uint256", type=int
)
safe_transfer_from_parser.add_argument(
"--data", required=True, help="Type: bytes", type=bytes_argument_type
)
safe_transfer_from_parser.set_defaults(func=handle_safe_transfer_from)
set_approval_for_all_parser = subcommands.add_parser("set-approval-for-all")
add_default_arguments(set_approval_for_all_parser, True)
set_approval_for_all_parser.add_argument(
"--operator", required=True, help="Type: address"
)
set_approval_for_all_parser.add_argument(
"--approved", required=True, help="Type: bool", type=boolean_argument_type
)
set_approval_for_all_parser.set_defaults(func=handle_set_approval_for_all)
set_uri_parser = subcommands.add_parser("set-uri")
add_default_arguments(set_uri_parser, True)
set_uri_parser.add_argument(
"--pool-id", required=True, help="Type: uint256", type=int
)
set_uri_parser.add_argument(
"--pool-uri", required=True, help="Type: string", type=str
)
set_uri_parser.set_defaults(func=handle_set_uri)
supports_interface_parser = subcommands.add_parser("supports-interface")
add_default_arguments(supports_interface_parser, False)
supports_interface_parser.add_argument(
"--interface-id", required=True, help="Type: bytes4", type=bytes_argument_type
)
supports_interface_parser.set_defaults(func=handle_supports_interface)
total_pools_parser = subcommands.add_parser("total-pools")
add_default_arguments(total_pools_parser, False)
total_pools_parser.set_defaults(func=handle_total_pools)
uri_parser = subcommands.add_parser("uri")
add_default_arguments(uri_parser, False)
uri_parser.add_argument("--pool-id", required=True, help="Type: uint256", type=int)
uri_parser.set_defaults(func=handle_uri)
return parser
def main() -> None:
parser = generate_cli()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,175 @@
# Code generated by moonworm : https://github.com/bugout-dev/moonworm
# Moonworm version : 0.1.8
import argparse
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from brownie import Contract, network, project
from brownie.network.contract import ContractContainer
from eth_typing.evm import ChecksumAddress
PROJECT_DIRECTORY = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
BUILD_DIRECTORY = os.path.join(PROJECT_DIRECTORY, "build", "contracts")
def boolean_argument_type(raw_value: str) -> bool:
TRUE_VALUES = ["1", "t", "y", "true", "yes"]
FALSE_VALUES = ["0", "f", "n", "false", "no"]
if raw_value.lower() in TRUE_VALUES:
return True
elif raw_value.lower() in FALSE_VALUES:
return False
raise ValueError(
f"Invalid boolean argument: {raw_value}. Value must be one of: {','.join(TRUE_VALUES + FALSE_VALUES)}"
)
def bytes_argument_type(raw_value: str) -> bytes:
return raw_value.encode()
def get_abi_json(abi_name: str) -> List[Dict[str, Any]]:
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{abi_name}.json")
if not os.path.isfile(abi_full_path):
raise IOError(
f"File does not exist: {abi_full_path}. Maybe you have to compile the smart contracts?"
)
with open(abi_full_path, "r") as ifp:
build = json.load(ifp)
abi_json = build.get("abi")
if abi_json is None:
raise ValueError(f"Could not find ABI definition in: {abi_full_path}")
return abi_json
def contract_from_build(abi_name: str) -> ContractContainer:
# This is workaround because brownie currently doesn't support loading the same project multiple
# times. This causes problems when using multiple contracts from the same project in the same
# python project.
PROJECT = project.main.Project("moonworm", Path(PROJECT_DIRECTORY))
abi_full_path = os.path.join(BUILD_DIRECTORY, f"{abi_name}.json")
if not os.path.isfile(abi_full_path):
raise IOError(
f"File does not exist: {abi_full_path}. Maybe you have to compile the smart contracts?"
)
with open(abi_full_path, "r") as ifp:
build = json.load(ifp)
return ContractContainer(PROJECT, build)
class TerminusInitializer:
def __init__(self, contract_address: Optional[ChecksumAddress]):
self.contract_name = "TerminusInitializer"
self.address = contract_address
self.contract = None
self.abi = get_abi_json("TerminusInitializer")
if self.address is not None:
self.contract: Optional[Contract] = Contract.from_abi(
self.contract_name, self.address, self.abi
)
def deploy(self, transaction_config):
contract_class = contract_from_build(self.contract_name)
deployed_contract = contract_class.deploy(transaction_config)
self.address = deployed_contract.address
self.contract = deployed_contract
def assert_contract_is_instantiated(self) -> None:
if self.contract is None:
raise Exception("contract has not been instantiated")
def init(self, transaction_config) -> Any:
self.assert_contract_is_instantiated()
return self.contract.init(transaction_config)
def get_transaction_config(args: argparse.Namespace) -> Dict[str, Any]:
signer = network.accounts.load(args.sender, args.password)
transaction_config: Dict[str, Any] = {"from": signer}
if args.gas_price is not None:
transaction_config["gas_price"] = args.gas_price
if args.confirmations is not None:
transaction_config["required_confs"] = args.confirmations
return transaction_config
def add_default_arguments(parser: argparse.ArgumentParser, transact: bool) -> None:
parser.add_argument(
"--network", required=True, help="Name of brownie network to connect to"
)
parser.add_argument(
"--address", required=False, help="Address of deployed contract to connect to"
)
if not transact:
return
parser.add_argument(
"--sender", required=True, help="Path to keystore file for transaction sender"
)
parser.add_argument(
"--password",
required=False,
help="Password to keystore file (if you do not provide it, you will be prompted for it)",
)
parser.add_argument(
"--gas-price", default=None, help="Gas price at which to submit transaction"
)
parser.add_argument(
"--confirmations",
type=int,
default=None,
help="Number of confirmations to await before considering a transaction completed",
)
def handle_deploy(args: argparse.Namespace) -> None:
network.connect(args.network)
transaction_config = get_transaction_config(args)
contract = TerminusInitializer(None)
result = contract.deploy(transaction_config=transaction_config)
print(result)
def handle_init(args: argparse.Namespace) -> None:
network.connect(args.network)
contract = TerminusInitializer(args.address)
transaction_config = get_transaction_config(args)
result = contract.init(transaction_config=transaction_config)
print(result)
def generate_cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="CLI for TerminusInitializer")
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers()
deploy_parser = subcommands.add_parser("deploy")
add_default_arguments(deploy_parser, True)
deploy_parser.set_defaults(func=handle_deploy)
init_parser = subcommands.add_parser("init")
add_default_arguments(init_parser, True)
init_parser.set_defaults(func=handle_init)
return parser
def main() -> None:
parser = generate_cli()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -1,6 +1,6 @@
import argparse
from . import core, ERC20Facet, ERC20Initializer
from . import core, ERC20Facet, ERC20Initializer, TerminusFacet, TerminusInitializer
def main():
@ -23,6 +23,16 @@ def main():
add_help=False,
)
terminus_parser = TerminusFacet.generate_cli()
dao_subparsers.add_parser("terminus", parents=[terminus_parser], add_help=False)
terminus_initializer_parser = TerminusInitializer.generate_cli()
dao_subparsers.add_parser(
"terminus-initializer",
parents=[terminus_initializer_parser],
add_help=False,
)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -18,6 +18,8 @@ from . import (
ERC20Facet,
ERC20Initializer,
OwnershipFacet,
TerminusFacet,
TerminusInitializer,
)
FACETS: Dict[str, Any] = {
@ -25,6 +27,7 @@ FACETS: Dict[str, Any] = {
"DiamondLoupeFacet": DiamondLoupeFacet,
"ERC20Facet": ERC20Facet,
"OwnershipFacet": OwnershipFacet,
"TerminusFacet": TerminusFacet,
}
FACET_PRECEDENCE: List[str] = [
@ -32,6 +35,7 @@ FACET_PRECEDENCE: List[str] = [
"OwnershipFacet",
"DiamondLoupeFacet",
"ERC20Facet",
"TerminusFacet",
]
FACET_ACTIONS: Dict[str, int] = {"add": 0, "replace": 1, "remove": 2}
@ -107,6 +111,12 @@ def facet_cut(
if initializer_address != ZERO_ADDRESS and action != "remove":
erc20_initializer = ERC20Initializer.ERC20Initializer(initializer_address)
calldata = erc20_initializer.contract.init.encode_input()
elif facet_name == "TerminusFacet":
if initializer_address != ZERO_ADDRESS and action != "remove":
terminus_initializer = TerminusInitializer.TerminusInitializer(
initializer_address
)
calldata = terminus_initializer.contract.init.encode_input()
diamond = DiamondCutFacet.DiamondCutFacet(diamond_address)
transaction = diamond.diamond_cut(