remove centipede

pull/3/head
yhtiyar 2021-11-03 14:16:27 +03:00
rodzic d2ae01354d
commit 0c196d8f55
34 zmienionych plików z 3 dodań i 4282 usunięć

6
.gitmodules vendored
Wyświetl plik

@ -1,3 +1,3 @@
[submodule "centipede/fixture/smart_contracts/openzeppelin-contracts"]
path = centipede/fixture/smart_contracts/openzeppelin-contracts
url = git@github.com:OpenZeppelin/openzeppelin-contracts.git
[submodule "moonworm/fixture/smart_contracts/openzeppelin-contracts"]
path = moonworm/fixture/smart_contracts/openzeppelin-contracts
url = git@github.com:OpenZeppelin/openzeppelin-contracts.git

Wyświetl plik

@ -1,125 +0,0 @@
import argparse
import json
import os
from shutil import copyfile
from .contracts import ERC20, ERC721
from .generator import (
generate_contract_cli_content,
generate_contract_interface_content,
)
def write_file(content: str, path: str):
with open(path, "w") as ofp:
ofp.write(content)
def copy_web3_util(dest_dir: str, force: bool = False) -> None:
dest_filepath = os.path.join(dest_dir, "web3_util.py")
if os.path.isfile(dest_filepath) and not force:
print(f"{dest_filepath} file already exists. Use -f to rewrite")
web3_util_path = os.path.join(os.path.dirname(__file__), "web3_util.py")
copyfile(web3_util_path, dest_filepath)
def create_init_py(dest_dir: str, force: bool = False) -> None:
dest_filepath = os.path.join(dest_dir, "__init__.py")
if os.path.isfile(dest_filepath) and not force:
print(f"{dest_filepath} file already exists. Use -f to rewrite")
with open(dest_filepath, "w") as ofp:
ofp.write("")
def handle_generate(args: argparse.Namespace) -> None:
args.name = args.name + "_"
if args.abi == "erc20":
contract_abi = ERC20.abi()
write_file(
ERC20.bytecode(), os.path.join(args.outdir, args.name + "bytecode.bin")
)
elif args.abi == "erc721":
contract_abi = ERC721.abi()
write_file(
ERC721.bytecode(), os.path.join(args.outdir, args.name + "bytecode.bin")
)
else:
with open(args.abi, "r") as ifp:
contract_abi = json.load(ifp)
abi_file_name = args.name + "abi.json"
write_file(json.dumps(contract_abi), os.path.join(args.outdir, abi_file_name))
copy_web3_util(args.outdir, args.force)
create_init_py(args.outdir, args.force)
if args.interface:
interface_content = generate_contract_interface_content(
contract_abi, abi_file_name
)
interface_name = args.name + "interface.py"
write_file(interface_content, os.path.join(args.outdir, interface_name))
if args.cli:
cli_content = generate_contract_cli_content(contract_abi, abi_file_name)
cli_name = args.name + "cli.py"
write_file(cli_content, os.path.join(args.outdir, cli_name))
def generate_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Centipede: Manage your smart contract"
)
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers()
generate_parser = subcommands.add_parser(
"generate", description="Centipede code generator"
)
generate_parser.add_argument(
"-i",
"--abi",
required=True,
help=f"Path to contract abi JSON file or (erc20|erc721)",
)
generate_parser.add_argument(
"-o",
"--outdir",
required=True,
help=f"Output directory where files will be generated.",
)
generate_parser.add_argument(
"--interface",
action="store_true",
help="Generate python interface for given smart contract abi",
)
generate_parser.add_argument(
"--cli",
action="store_true",
help="Generate cli for given smart contract abi",
)
generate_parser.add_argument(
"--name",
"-n",
required=True,
help="Prefix name for generated files",
)
generate_parser.add_argument(
"--force",
"-f",
action="store_true",
help="Force rewrite generated files",
)
generate_parser.set_defaults(func=handle_generate)
return parser
def main() -> None:
parser = generate_argument_parser()
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -1,150 +0,0 @@
# Code generated by moonstream centipede : https://github.com/bugout-dev/centipede
# Centipede version : {centipede_version}
import argparse
import json
from typing import Any, Callable, Dict, List, Optional, Tuple
import os
from eth_typing.evm import Address, ChecksumAddress
import web3
from web3 import Web3
from web3.contract import Contract
from .web3_util import *
abi_path = os.path.join(os.path.dirname(__file__), "{abi_file_name}")
with open(abi_path, "r") as abi_file:
CONTRACT_ABI = json.load(abi_file)
CONTRACT_FUNCTIONS = {{}}
for abi_item in CONTRACT_ABI:
if abi_item["type"] == "function":
CONTRACT_FUNCTIONS[abi_item["name"]] = abi_item
if abi_item["type"] == "constructor":
CONTRACT_FUNCTIONS["constructor"] = abi_item
def init_web3(ipc_path: str) -> Web3:
return Web3(web3.HTTPProvider(ipc_path))
def init_contract(web3: Web3, abi: Dict[str, Any], address: Optional[str]) -> Contract:
checksum_address: Optional[ChecksumAddress] = None
if address is not None:
checksum_address = web3.toChecksumAddress(address)
return web3.eth.contract(address=checksum_address, abi=abi)
def make_function_call(contract: Contract, function_name: str, *args):
return contract.functions[function_name](*args).call()
def populate_subparser_with_common_args(
leaf_parser: argparse.ArgumentParser,
) -> None:
leaf_parser.add_argument(
"-w",
"--web3",
required=True,
help=f"Web3 IPC connection",
)
leaf_parser.add_argument(
"-c",
"--contract_address",
required=True,
help=f"contract_address",
)
def populate_deploy_subparser(
leaf_parser: argparse.ArgumentParser,
) -> None:
leaf_parser.add_argument(
"-w",
"--web3",
required=True,
help=f"Web3 IPC connection",
)
leaf_parser.add_argument(
"-b",
"--bytecode-path",
required=True,
help=f"Path to contract bytecode",
)
{cli_content}
def get_address_and_private_key():
try:
address, pk = read_keys_from_env()
except Exception as e:
print(f"{{str(e)}}, Reading from cli")
address, pk = read_keys_from_cli()
return address, pk
def handle_args(args: argparse.Namespace):
# Initializng contract
web3 = init_web3(args.web3)
kwargs = vars(args)
if args.subcommand == "call":
contract_address = web3.toChecksumAddress(args.contract_address)
contract = web3.eth.contract(address=contract_address, abi=CONTRACT_ABI)
function_name = kwargs["function_name"]
del kwargs["function_name"]
call_args = [
cast_to_python_type(f_arg["type"])(kwargs[f_arg["name"]])
for f_arg in CONTRACT_FUNCTIONS[function_name]["inputs"]
]
print(make_function_call(contract, function_name, *call_args))
if args.subcommand == "transact":
contract_address = web3.toChecksumAddress(args.contract_address)
contract = web3.eth.contract(address=contract_address, abi=CONTRACT_ABI)
function_name = kwargs["function_name"]
del kwargs["function_name"]
call_args = [
cast_to_python_type(f_arg["type"])(kwargs[f_arg["name"]])
for f_arg in CONTRACT_FUNCTIONS[function_name]["inputs"]
]
address, pk = get_address_and_private_key()
transaction = build_transaction(
web3, contract.functions[function_name](*call_args), address
)
print(f"Transaction:\n{{transaction}}")
answer = input("Sign and submit transaction? (y/Y for yes)")
if answer == "Y" or answer == "y":
transaction_hash = submit_transaction(web3, transaction, pk)
print("Transaction is submitted, waiting for transaction receipt...")
tx_receipt = wait_for_transaction_receipt(web3, transaction_hash)
print(f"Got receipt:\n{{tx_receipt}}")
if args.subcommand == "deploy":
with open(args.bytecode_path, "r") as ifp:
bytecode = ifp.read()
address, pk = get_address_and_private_key()
constructor_args = [
cast_to_python_type(f_arg["type"])(kwargs[f_arg["name"]])
for f_arg in CONTRACT_FUNCTIONS["constructor"]["inputs"]
]
print("Deploying contract")
tx_hash, contract_address = deploy_contract(
web3,
bytecode,
CONTRACT_ABI,
deployer=address,
deployer_private_key=pk,
constructor_arguments=constructor_args,
)
print(f"Tx hash of deployment: {{web3.toHex(tx_hash)}}")
print(f"Address of deployed_contract: {{contract_address}}")
def main() -> None:
parser = generate_argument_parser()
args = parser.parse_args()
handle_args(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -1,33 +0,0 @@
# Code generated by moonstream centipede : https://github.com/bugout-dev/centipede
# Centipede version : {centipede_version}
import json
import os
from typing import Any, Dict, Union
from eth_typing.evm import Address, ChecksumAddress
from web3 import Web3
from web3.contract import ContractFunction
from .web3_util import *
abi_path = os.path.join(os.path.dirname(__file__), "{abi_file_name}")
with open(abi_path, "r") as abi_file:
CONTRACT_ABI = json.load(abi_file)
{contract_body}
def deploy(
web3: Web3,
contract_constructor: ContractFunction,
contract_bytecode: str,
deployer_address: ChecksumAddress,
deployer_private_key: str,
) -> Contract:
tx_hash, contract_address = deploy_contract_from_constructor_function(
web3,
constructor=contract_constructor,
contract_bytecode=contract_bytecode,
contract_abi=CONTRACT_ABI,
deployer=deployer_address,
deployer_private_key=deployer_private_key,
)
return Contract(web3, contract_address)

Wyświetl plik

@ -1,40 +0,0 @@
import json
import os
from typing import Any, Dict
_PATHS = {
"abi": {
"erc20": "fixture/abis/CentipedeERC20.json",
"erc1155": "fixture/abis/CentipedeERC1155.json",
"erc721": "fixture/abis/CentipedeERC721.json",
},
"bytecode": {
"erc20": "fixture/bytecodes/CentipedeERC20.bin",
"erc1155": "fixture/bytecodes/CentipedeERC1155.bin",
"erc721": "fixture/bytecodes/CentipedeERC721.bin",
},
}
class CentipedeContract:
def __init__(self, abi_path: str, bytecode_path: str) -> None:
self._abi_path = abi_path
self._bytecode_path = bytecode_path
def abi(self) -> Dict[str, Any]:
base_dir = os.path.dirname(__file__)
with open(os.path.join(base_dir, self._abi_path), "r") as ifp:
abi = json.load(ifp)
return abi
def bytecode(self) -> str:
base_dir = os.path.dirname(__file__)
with open(os.path.join(base_dir, self._bytecode_path), "r") as ifp:
bytecode = ifp.read()
return bytecode
ERC20 = CentipedeContract(_PATHS["abi"]["erc20"], _PATHS["bytecode"]["erc20"])
ERC721 = CentipedeContract(_PATHS["abi"]["erc721"], _PATHS["bytecode"]["erc721"])
ERC1155 = CentipedeContract(_PATHS["abi"]["erc1155"], _PATHS["bytecode"]["erc1155"])

Wyświetl plik

@ -1,129 +0,0 @@
import sys
import time
import logging
from web3.providers.rpc import HTTPProvider
from web3 import Web3
from .log_scanner import EventScanner
from .state import JSONifiedState
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
if __name__ == "__main__":
# Simple demo that scans all the token transfers of RCC token (11k).
# The demo supports persistant state by using a JSON file.
# You will need an Ethereum node for this.
# Running this script will consume around 20k JSON-RPC calls.
# With locally running Geth, the script takes 10 minutes.
# The resulting JSON state file is 2.9 MB.
# We use tqdm library to render a nice progress bar in the console
# https://pypi.org/project/tqdm/
from tqdm import tqdm
# RCC has around 11k Transfer events
# https://etherscan.io/token/0x9b6443b0fb9c241a7fdac375595cea13e6b7807a
RCC_ADDRESS = "0x7ceb23fd6bc0add59e62ac25578270cff1b9f619"
# Reduced ERC-20 ABI, only Transfer event
ABI = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"},
],
"name": "Transfer",
"type": "event",
}
]
def run():
if len(sys.argv) < 2:
print("Usage: eventscanner.py http://your-node-url")
sys.exit(1)
api_url = sys.argv[1]
# Enable logs to the stdout.
# DEBUG is very verbose level
logging.basicConfig(level=logging.INFO)
provider = HTTPProvider(api_url)
# Remove the default JSON-RPC retry middleware
# as it correctly cannot handle eth_getLogs block range
# throttle down.
provider.middlewares.clear()
web3 = Web3(provider)
print(web3.eth.block_number)
# Restore/create our persistent state
state = JSONifiedState()
state.restore()
# chain_id: int, web3: Web3, abi: dict, state: EventScannerState, events: List, filters: {}, max_chunk_scan_size: int=10000
scanner = EventScanner(
web3=web3,
scanner_state=state,
events=ABI,
addresses=[RCC_ADDRESS],
# How many maximum blocks at the time we request from JSON-RPC
# and we are unlikely to exceed the response size limit of the JSON-RPC server
max_chunk_scan_size=100000,
skip_block_timestamp=True,
)
# Assume we might have scanned the blocks all the way to the last Ethereum block
# that mined a few seconds before the previous scan run ended.
# Because there might have been a minor Etherueum chain reorganisations
# since the last scan ended, we need to discard
# the last few blocks from the previous scan results.
chain_reorg_safety_blocks = 10
scanner.delete_potentially_forked_block_data(
state.get_last_scanned_block() - chain_reorg_safety_blocks
)
# Scan from [last block scanned] - [latest ethereum block]
# Note that our chain reorg safety blocks cannot go negative
# start_block = max(state.get_last_scanned_block() - chain_reorg_safety_blocks, 0)
end_block = scanner.get_suggested_scan_end_block()
start_block = end_block - 1000
blocks_to_scan = end_block - start_block
print(f"Scanning events from blocks {start_block} - {end_block}")
# Render a progress bar in the console
start = time.time()
with tqdm(total=blocks_to_scan) as progress_bar:
def _update_progress(
start, end, current, current_block_timestamp, chunk_size, events_count
):
if current_block_timestamp:
formatted_time = current_block_timestamp.strftime("%d-%m-%Y")
else:
formatted_time = ""
progress_bar.set_description(
f"Current block: {current} ({formatted_time}), blocks in a scan batch: {chunk_size}, events processed in a batch {events_count}"
)
progress_bar.update(chunk_size)
# Run the scan
result, total_chunks_scanned = scanner.scan(
start_block, end_block, progress_callback=_update_progress
)
state.save()
duration = time.time() - start
print(
f"Scanned total {len(result)} Transfer events, in {duration} seconds, total {total_chunks_scanned} chunk scans performed"
)
run()

Wyświetl plik

@ -1,392 +0,0 @@
# Used example scanner from web3 documentation : https://web3py.readthedocs.io/en/stable/examples.html#eth-getlogs-limitations
import datetime
import logging
import time
from typing import Iterable, List, Tuple, Optional, Callable
from typing import Optional
from eth_abi.codec import ABICodec
from eth_typing.evm import ChecksumAddress
from web3 import Web3
from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data
from web3.datastructures import AttributeDict
from web3.exceptions import BlockNotFound
from web3.types import ABIEvent, FilterParams
from .state import EventScannerState
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _retry_web3_call(func, start_block, end_block, retries, delay) -> Tuple[int, list]:
"""A custom retry loop to throttle down block range.
If our JSON-RPC server cannot serve all incoming `eth_getLogs` in a single request,
we retry and throttle down block range for every retry.
For example, Go Ethereum does not indicate what is an acceptable response size.
It just fails on the server-side with a "context was cancelled" warning.
:param func: A callable that triggers Ethereum JSON-RPC, as func(start_block, end_block)
:param start_block: The initial start block of the block range
:param end_block: The initial start block of the block range
:param retries: How many times we retry
:param delay: Time to sleep between retries
"""
for i in range(retries):
try:
return end_block, func(start_block, end_block)
except Exception as e:
# Assume this is HTTPConnectionPool(host='localhost', port=8545): Read timed out. (read timeout=10)
# from Go Ethereum. This translates to the error "context was cancelled" on the server side:
# https://github.com/ethereum/go-ethereum/issues/20426
if i < retries - 1:
# Give some more verbose info than the default middleware
logger.warning(
"Retrying events for block range %d - %d (%d) failed with %s, retrying in %s seconds",
start_block,
end_block,
end_block - start_block,
e,
delay,
)
# Decrease the `eth_getBlocks` range
end_block = start_block + ((end_block - start_block) // 2)
# Let the JSON-RPC to recover e.g. from restart
time.sleep(delay)
continue
else:
logger.warning("Out of retries")
raise
def _fetch_events_chunk(
web3,
event_abi,
from_block: int,
to_block: int,
addresses: Optional[List[ChecksumAddress]] = None,
) -> Iterable:
"""Get events using eth_getLogs API.
This method is detached from any contract instance.
This is a stateless method, as opposed to createFilter.
It can be safely called against nodes which do not provide `eth_newFilter` API, like Infura.
"""
if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")
# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec
# Here we need to poke a bit into Web3 internals, as this
# functionality is not exposed by default.
# Construct JSON-RPC raw filter presentation based on human readable Python descriptions
# Namely, convert event names to their keccak signatures
# More information here:
# https://github.com/ethereum/web3.py/blob/e176ce0793dafdd0573acc8d4b76425b6eb604ca/web3/_utils/filters.py#L71
# TODO(yhtiyar): Add argument filters and contract address filters
data_filter_set, event_filter_params = construct_event_filter_params(
event_abi,
codec,
fromBlock=from_block,
toBlock=to_block,
)
if addresses:
event_filter_params["address"] = addresses
logger.debug(
"Querying eth_getLogs with the following parameters: %s",
event_filter_params,
)
# Call JSON-RPC API on your Ethereum node.
# get_logs() returns raw AttributedDict entries
logs = web3.eth.get_logs(event_filter_params)
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
try:
evt = get_event_data(codec, event_abi, log)
# Note: This was originally yield,
# but deferring the timeout exception caused the throttle logic not to work
all_events.append(evt)
except:
continue
return all_events
class EventScanner:
def __init__(
self,
web3: Web3,
events: List,
addresses: Optional[str] = None,
scanner_state: Optional[EventScannerState] = None,
max_chunk_scan_size: int = 10000,
max_request_retries: int = 30,
request_retry_seconds: float = 3.0,
skip_block_timestamp: bool = False,
):
"""
:param events: List of web3 Event we scan
:param max_chunk_scan_size: JSON-RPC API limit in the number of blocks we query. (Recommendation: 10,000 for mainnet, 500,000 for testnets)
:param max_request_retries: How many times we try to reattempt a failed JSON-RPC call
:param request_retry_seconds: Delay between failed requests to let JSON-RPC server to recover
"""
self.web3 = web3
self.state = scanner_state
self.events = events
self.skip_block_timestamp = skip_block_timestamp
self.checksum_addresses = []
if addresses:
for address in addresses:
self.checksum_addresses.append(web3.toChecksumAddress(address))
# Our JSON-RPC throttling parameters
self.min_scan_chunk_size = 10 # 12 s/block = 120 seconds period
self.max_scan_chunk_size = max_chunk_scan_size
self.max_request_retries = max_request_retries
self.request_retry_seconds = request_retry_seconds
# Factor how fast we increase the chunk size if results are found
# # (slow down scan after starting to get hits)
self.chunk_size_decrease = 0.5
# Factor how was we increase chunk size if no results found
self.chunk_size_increase = 2.0
def get_block_timestamp(self, block_num) -> Optional[datetime.datetime]:
"""Get Ethereum block timestamp"""
if self.skip_block_timestamp:
# Returning None since, config set to skip getting block timestamp data
return None
try:
block_info = self.web3.eth.getBlock(block_num)
except BlockNotFound:
# Block was not mined yet,
# minor chain reorganisation?
return None
last_time = block_info["timestamp"]
return datetime.datetime.utcfromtimestamp(last_time)
def get_suggested_scan_start_block(self):
"""Get where we should start to scan for new token events.
If there are no prior scans, start from block 1.
Otherwise, start from the last end block minus ten blocks.
We rescan the last ten scanned blocks in the case there were forks to avoid
misaccounting due to minor single block works (happens once in a hour in Ethereum).
These heurestics could be made more robust, but this is for the sake of simple reference implementation.
"""
end_block = self.get_last_scanned_block()
if end_block:
return max(1, end_block - self.NUM_BLOCKS_RESCAN_FOR_FORKS)
return 1
def get_suggested_scan_end_block(self):
"""Get the last mined block on Ethereum chain we are following."""
# Do not scan all the way to the final block, as this
# block might not be mined yet
return self.web3.eth.blockNumber - 1
def get_last_scanned_block(self) -> int:
return self.state.get_last_scanned_block()
def delete_potentially_forked_block_data(self, after_block: int):
"""Purge old data in the case of blockchain reorganisation."""
self.state.delete_data(after_block)
def estimate_next_chunk_size(self, current_chuck_size: int, event_found_count: int):
"""Try to figure out optimal chunk size
Our scanner might need to scan the whole blockchain for all events
* We want to minimize API calls over empty blocks
* We want to make sure that one scan chunk does not try to process too many entries once, as we try to control commit buffer size and potentially asynchronous busy loop
* Do not overload node serving JSON-RPC API by asking data for too many events at a time
Currently Ethereum JSON-API does not have an API to tell when a first event occured in a blockchain
and our heuristics try to accelerate block fetching (chunk size) until we see the first event.
These heurestics exponentially increase the scan chunk size depending on if we are seeing events or not.
When any transfers are encountered, we are back to scanning only a few blocks at a time.
It does not make sense to do a full chain scan starting from block 1, doing one JSON-RPC call per 20 blocks.
"""
if event_found_count > 0:
# When we encounter first events, reset the chunk size window
current_chuck_size = self.min_scan_chunk_size
else:
current_chuck_size *= self.chunk_size_increase
current_chuck_size = max(self.min_scan_chunk_size, current_chuck_size)
current_chuck_size = min(self.max_scan_chunk_size, current_chuck_size)
return int(current_chuck_size)
def scan_chunk(self, start_block, end_block) -> Tuple[int, datetime.datetime, list]:
"""Read and process events between to block numbers.
Dynamically decrease the size of the chunk if the case JSON-RPC server pukes out.
:return: tuple(actual end block number, when this block was mined, processed events)
"""
block_timestamps = {}
get_block_timestamp = self.get_block_timestamp
# Cache block timestamps to reduce some RPC overhead
# Real solution might include smarter models around block
def get_block_when(block_num):
if block_num not in block_timestamps:
block_timestamps[block_num] = get_block_timestamp(block_num)
return block_timestamps[block_num]
all_processed = []
for event_type in self.events:
# Callable that takes care of the underlying web3 call
def _fetch_events(_start_block, _end_block):
return _fetch_events_chunk(
self.web3,
event_type,
from_block=_start_block,
to_block=_end_block,
addresses=self.checksum_addresses,
)
# Do `n` retries on `eth_getLogs`,
# throttle down block range if needed
end_block, events = _retry_web3_call(
_fetch_events,
start_block=start_block,
end_block=end_block,
retries=self.max_request_retries,
delay=self.request_retry_seconds,
)
for evt in events:
idx = evt[
"logIndex"
] # Integer of the log index position in the block, null when its pending
# We cannot avoid minor chain reorganisations, but
# at least we must avoid blocks that are not mined yet
assert idx is not None, "Somehow tried to scan a pending block"
block_number = evt["blockNumber"]
# Get UTC time when this event happened (block mined timestamp)
# from our in-memory cache
block_when = get_block_when(block_number)
logger.debug(
"Processing event %s, block:%d count:%d",
evt["event"],
evt["blockNumber"],
)
processed = self.state.process_event(block_when, evt)
all_processed.append(processed)
end_block_timestamp = get_block_when(end_block)
return end_block, end_block_timestamp, all_processed
def scan(
self,
start_block,
end_block,
start_chunk_size=20,
progress_callback=Optional[Callable],
) -> Tuple[list, int]:
"""Perform a token balances scan.
Assumes all balances in the database are valid before start_block (no forks sneaked in).
:param start_block: The first block included in the scan
:param end_block: The last block included in the scan
:param start_chunk_size: How many blocks we try to fetch over JSON-RPC on the first attempt
:param progress_callback: If this is an UI application, update the progress of the scan
:return: [All processed events, number of chunks used]
"""
assert start_block <= end_block
current_block = start_block
# Scan in chunks, commit between
chunk_size = start_chunk_size
last_scan_duration = last_logs_found = 0
total_chunks_scanned = 0
# All processed entries we got on this scan cycle
all_processed = []
while current_block <= end_block:
self.state.start_chunk(current_block, chunk_size)
# Print some diagnostics to logs to try to fiddle with real world JSON-RPC API performance
estimated_end_block = current_block + chunk_size
logger.debug(
"Scanning token transfers for blocks: %d - %d, chunk size %d, last chunk scan took %f, last logs found %d",
current_block,
estimated_end_block,
chunk_size,
last_scan_duration,
last_logs_found,
)
start = time.time()
actual_end_block, end_block_timestamp, new_entries = self.scan_chunk(
current_block, estimated_end_block
)
# Where does our current chunk scan ends - are we out of chain yet?
current_end = actual_end_block
last_scan_duration = time.time() - start
all_processed += new_entries
# Print progress bar
if progress_callback:
progress_callback(
start_block,
end_block,
current_block,
end_block_timestamp,
chunk_size,
len(new_entries),
)
# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self.estimate_next_chunk_size(chunk_size, len(new_entries))
# Set where the next chunk starts
current_block = current_end + 1
total_chunks_scanned += 1
self.state.end_chunk(current_end)
return all_processed, total_chunks_scanned

Wyświetl plik

@ -1,2 +0,0 @@
from .event_scanner_state import EventScannerState
from .json_state import JSONifiedState

Wyświetl plik

@ -1,54 +0,0 @@
from abc import ABC, abstractmethod
import datetime
from typing import Optional
from web3.datastructures import AttributeDict
class EventScannerState(ABC):
"""Application state that remembers what blocks we have scanned in the case of crash."""
@abstractmethod
def get_last_scanned_block(self) -> int:
"""Number of the last block we have scanned on the previous cycle.
:return: 0 if no blocks scanned yet
"""
@abstractmethod
def start_chunk(self, block_number: int):
"""Scanner is about to ask data of multiple blocks over JSON-RPC.
Start a database session if needed.
"""
@abstractmethod
def end_chunk(self, block_number: int):
"""Scanner finished a number of blocks.
Persistent any data in your state now.
"""
@abstractmethod
def process_event(
self, block_when: Optional[datetime.datetime], event: AttributeDict
) -> object:
"""Process incoming events.
This function takes raw events from Web3, transforms them to your application internal
format, then saves them in a database or some other state.
:param block_when: When this block was mined
:param event: Symbolic dictionary of the event data
:return: Internal state structure that is the result of event tranformation.
"""
@abstractmethod
def delete_data(self, since_block: int) -> int:
"""Delete any data since this block was scanned.
Purges any potential minor reorg data.
"""

Wyświetl plik

@ -1,114 +0,0 @@
import json
import time
import datetime
from typing import Optional
from web3.datastructures import AttributeDict
from .event_scanner_state import EventScannerState
class JSONifiedState(EventScannerState):
"""Store the state of scanned blocks and all events.
All state is an in-memory dict.
Simple load/store massive JSON on start up.
"""
def __init__(self):
self.state = None
self.fname = "test-state.json"
# How many second ago we saved the JSON file
self.last_save = 0
def reset(self):
"""Create initial state of nothing scanned."""
self.state = {
"last_scanned_block": 0,
"blocks": {},
}
def restore(self):
"""Restore the last scan state from a file."""
try:
self.state = json.load(open(self.fname, "rt"))
print(
f"Restored the state, previously {self.state['last_scanned_block']} blocks have been scanned"
)
except (IOError, json.decoder.JSONDecodeError):
print("State starting from scratch")
self.reset()
def save(self):
"""Save everything we have scanned so far in a file."""
with open(self.fname, "wt") as f:
json.dump(self.state, f)
self.last_save = time.time()
#
# EventScannerState methods implemented below
#
def get_last_scanned_block(self):
"""The number of the last block we have stored."""
return self.state["last_scanned_block"]
def delete_data(self, since_block):
"""Remove potentially reorganised blocks from the scan data."""
for block_num in range(since_block, self.get_last_scanned_block()):
if block_num in self.state["blocks"]:
del self.state["blocks"][block_num]
def start_chunk(self, block_number, chunk_size):
pass
def end_chunk(self, block_number):
"""Save at the end of each block, so we can resume in the case of a crash or CTRL+C"""
# Next time the scanner is started we will resume from this block
self.state["last_scanned_block"] = block_number
# Save the database file for every minute
if time.time() - self.last_save > 60:
self.save()
def process_event(
self, block_when: Optional[datetime.datetime], event: AttributeDict
) -> str:
"""Record a ERC-20 transfer in our database."""
# Events are keyed by their transaction hash and log index
# One transaction may contain multiple events
# and each one of those gets their own log index
# event_name = event.event # "Transfer"
log_index = event.logIndex # Log index within the block
# transaction_index = event.transactionIndex # Transaction index within the block
txhash = event.transactionHash.hex() # Transaction hash
block_number = event.blockNumber
# Convert ERC-20 Transfer event to our internal format
args = event["args"]
transfer = {
"from": args["from"],
"to": args.to,
"value": args.value,
}
if block_when is not None:
transfer["timestamp"] = block_when.isoformat()
# Create empty dict as the block that contains all transactions by txhash
if block_number not in self.state["blocks"]:
self.state["blocks"][block_number] = {}
block = self.state["blocks"][block_number]
if txhash not in block:
# We have not yet recorded any transfers in this transaction
# (One transaction may contain multiple events if executed by a smart contract).
# Create a tx entry that contains all events by a log index
self.state["blocks"][block_number][txhash] = {}
# Record ERC-20 transfer in our database
self.state["blocks"][block_number][txhash][log_index] = transfer
# Return a pointer that allows us to look up this event later if needed
return f"{block_number}-{txhash}-{log_index}"

Wyświetl plik

@ -1,537 +0,0 @@
[
{
"inputs": [
{
"internalType": "string",
"name": "_name",
"type": "string"
},
{
"internalType": "string",
"name": "_symbol",
"type": "string"
},
{
"internalType": "string",
"name": "_uri",
"type": "string"
},
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "account",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"indexed": false,
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "ApprovalForAll",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "previousOwner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "OwnershipTransferred",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256[]",
"name": "ids",
"type": "uint256[]"
},
{
"indexed": false,
"internalType": "uint256[]",
"name": "values",
"type": "uint256[]"
}
],
"name": "TransferBatch",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "id",
"type": "uint256"
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256"
}
],
"name": "TransferSingle",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "string",
"name": "value",
"type": "string"
},
{
"indexed": true,
"internalType": "uint256",
"name": "id",
"type": "uint256"
}
],
"name": "URI",
"type": "event"
},
{
"inputs": [
{
"internalType": "address",
"name": "account",
"type": "address"
},
{
"internalType": "uint256",
"name": "id",
"type": "uint256"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address[]",
"name": "accounts",
"type": "address[]"
},
{
"internalType": "uint256[]",
"name": "ids",
"type": "uint256[]"
}
],
"name": "balanceOfBatch",
"outputs": [
{
"internalType": "uint256[]",
"name": "",
"type": "uint256[]"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "_account",
"type": "address"
},
{
"internalType": "uint256[]",
"name": "_ids",
"type": "uint256[]"
},
{
"internalType": "uint256",
"name": "_amaunt",
"type": "uint256"
},
{
"internalType": "bytes[]",
"name": "_data",
"type": "bytes[]"
}
],
"name": "batchMint",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "string",
"name": "_cid",
"type": "string"
},
{
"internalType": "bytes",
"name": "_data",
"type": "bytes"
}
],
"name": "create",
"outputs": [
{
"internalType": "uint256",
"name": "_id",
"type": "uint256"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "account",
"type": "address"
},
{
"internalType": "address",
"name": "operator",
"type": "address"
}
],
"name": "isApprovedForAll",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "_account",
"type": "address"
},
{
"internalType": "uint256",
"name": "_id",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "_amaunt",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "_data",
"type": "bytes"
}
],
"name": "mint",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "name",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "owner",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "pause",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "paused",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "renounceOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256[]",
"name": "ids",
"type": "uint256[]"
},
{
"internalType": "uint256[]",
"name": "amounts",
"type": "uint256[]"
},
{
"internalType": "bytes",
"name": "data",
"type": "bytes"
}
],
"name": "safeBatchTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "id",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "data",
"type": "bytes"
}
],
"name": "safeTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "setApprovalForAll",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes4",
"name": "interfaceId",
"type": "bytes4"
}
],
"name": "supportsInterface",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "symbol",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "_id",
"type": "uint256"
}
],
"name": "totalSupply",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "_id",
"type": "uint256"
}
],
"name": "uri",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
}
]

Wyświetl plik

@ -1 +0,0 @@
[{"inputs":[{"internalType":"string","name":"name_","type":"string"},{"internalType":"string","name":"symbol_","type":"string"},{"internalType":"address","name":"owner","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"owner","type":"address"},{"indexed":true,"internalType":"address","name":"spender","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"previousOwner","type":"address"},{"indexed":true,"internalType":"address","name":"newOwner","type":"address"}],"name":"OwnershipTransferred","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"from","type":"address"},{"indexed":true,"internalType":"address","name":"to","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Transfer","type":"event"},{"inputs":[{"internalType":"address","name":"owner","type":"address"},{"internalType":"address","name":"spender","type":"address"}],"name":"allowance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"approve","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"subtractedValue","type":"uint256"}],"name":"decreaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"addedValue","type":"uint256"}],"name":"increaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"mint","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"name","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"owner","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"renounceOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"symbol","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"totalSupply","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transfer","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"sender","type":"address"},{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transferFrom","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"newOwner","type":"address"}],"name":"transferOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"}]

Wyświetl plik

@ -1,423 +0,0 @@
[
{
"inputs": [
{
"internalType": "string",
"name": "name_",
"type": "string"
},
{
"internalType": "string",
"name": "symbol_",
"type": "string"
},
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "approved",
"type": "address"
},
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "Approval",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"indexed": false,
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "ApprovalForAll",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "previousOwner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "OwnershipTransferred",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
},
{
"inputs": [
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "approve",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "getApproved",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"internalType": "address",
"name": "operator",
"type": "address"
}
],
"name": "isApprovedForAll",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "mint",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "name",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "owner",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "ownerOf",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "renounceOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "safeTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "_data",
"type": "bytes"
}
],
"name": "safeTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "setApprovalForAll",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes4",
"name": "interfaceId",
"type": "bytes4"
}
],
"name": "supportsInterface",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "symbol",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "tokenURI",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "transferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
}
]

Wyświetl plik

@ -1,59 +0,0 @@
[
{
"inputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"constant": true,
"inputs": [],
"name": "greet",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "greeting",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": false,
"inputs": [
{
"internalType": "string",
"name": "_greeting",
"type": "string"
}
],
"name": "setGreeting",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
}
]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Wyświetl plik

@ -1,58 +0,0 @@
[
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"name": "from",
"type": "address"
},
{
"indexed": false,
"name": "to",
"type": "address"
},
{
"indexed": false,
"name": "tokenId",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"name": "owner",
"type": "address"
},
{
"indexed": false,
"name": "approved",
"type": "address"
},
{
"indexed": false,
"name": "tokenId",
"type": "uint256"
}
],
"name": "Approval",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"name": "newContract",
"type": "address"
}
],
"name": "ContractUpgrade",
"type": "event"
}
]

Wyświetl plik

@ -1,148 +0,0 @@
///@notice This contract follows ERC1155, only owner can create and mint token
// SPDX-License-Identifier: GPL-2.0
pragma solidity ^0.8.0;
import "./openzeppelin-contracts/contracts/token/ERC1155/ERC1155.sol";
import "./openzeppelin-contracts/contracts/access/Ownable.sol";
import "./openzeppelin-contracts/contracts/utils/Counters.sol";
contract CentipedeERC1155 is ERC1155, Ownable {
using Counters for Counters.Counter;
Counters.Counter private ID;
bool public paused = false;
string public name;
string public symbol;
// Mapping from token ID to token URI
mapping(uint256 => string) private idToUri;
// Mapping from token ID to token supply
mapping(uint256 => uint256) private tokenSupply;
constructor(
string memory _name,
string memory _symbol,
string memory _uri,
address owner
) ERC1155(_uri) {
name = _name;
symbol = _symbol;
transferOwnership(owner);
}
modifier pausable() {
if (paused) {
revert("Paused");
} else {
_;
}
}
/**
* @dev Creates a new NFT type
* @param _cid Content identifier
* @param _data Data to pass if receiver is contract
* @return _id The newly created token ID
*/
function create(string calldata _cid, bytes calldata _data)
external
onlyOwner
returns (uint256 _id)
{
require(bytes(_cid).length > 0, "Err: Missing Content Identifier");
_id = _nextId();
_mint(msg.sender, _id, 0, _data);
string memory _uri = _createUri(_cid);
idToUri[_id] = _uri;
emit URI(_uri, _id);
}
/**
* @dev Mints an existing NFT type
* @notice Enforces a maximum of 1 minting event per NFT type per account
* @param _account Account to mint NFT to (i.e. the owner)
* @param _id ID (i.e. type) of NFT to mint
* // _signature Verified signature granting _account an NFT
* @param _data Data to pass if receiver is contract
*/
function mint(
address _account,
uint256 _id,
uint256 _amount,
bytes calldata _data
) public pausable onlyOwner {
require(_exists(_id), "Err: Invalid ID");
//require(verify(_account, _id, _signature), "Err: Invalid Signature");
_mint(_account, _id, _amount, _data);
tokenSupply[_id] += _amount;
}
/**
* @dev Batch mints multiple different existing NFT types
* @notice Enforces a maximum of 1 minting event per account per NFT type
* @param _account Account to mint NFT to (i.e. the owner)
* @param _ids IDs of the type of NFT to mint
* @param _data Data to pass if receiver is contract
*/
function batchMint(
address _account,
uint256[] calldata _ids,
uint256 _amount,
bytes[] calldata _data
) external pausable onlyOwner {
for (uint256 i = 0; i < _ids.length; i++) {
mint(_account, _ids[i], _amount, _data[i]);
}
}
function _createUri(string memory _cid)
internal
view
returns (string memory _uri)
{
string memory baseUri = super.uri(0);
return string(abi.encodePacked(baseUri, _cid));
}
function _nextId() internal returns (uint256 id) {
ID.increment();
return ID.current();
}
function _exists(uint256 _id) internal view returns (bool) {
return (bytes(idToUri[_id]).length > 0);
}
/**
* @dev Returns the uri of a token given its ID
* @param _id ID of the token to query
* @return uri of the token or an empty string if it does not exist
*/
function uri(uint256 _id) public view override returns (string memory) {
return idToUri[_id];
}
/**
* @dev Returns the total quantity for a token ID
* @param _id ID of the token to query
* @return amount of token in existence
*/
function totalSupply(uint256 _id) public view returns (uint256) {
return tokenSupply[_id];
}
/**
* @dev Pause or unpause the minting and creation of NFTs
*/
function pause() public onlyOwner {
paused = !paused;
}
}

Wyświetl plik

@ -1,24 +0,0 @@
///@notice This contract follows ERC20, only owner can mint new tokens
// SPDX-License-Identifier: GPL-2.0
pragma solidity ^0.8.0;
import "./openzeppelin-contracts/contracts/token/ERC20/ERC20.sol";
import "./openzeppelin-contracts/contracts/access/Ownable.sol";
contract CentipedeERC20 is ERC20, Ownable {
constructor(
string memory name_,
string memory symbol_,
address owner
) ERC20(name_, symbol_) {
transferOwnership(owner);
}
function mint(address account, uint256 amount) public onlyOwner {
_mint(account, amount);
}
function decimals() public view virtual override returns (uint8) {
return 0;
}
}

Wyświetl plik

@ -1,20 +0,0 @@
///@notice This contract follows ERC20, only owner can mint new tokens
// SPDX-License-Identifier: GPL-2.0
pragma solidity ^0.8.0;
import "./openzeppelin-contracts/contracts/token/ERC721/ERC721.sol";
import "./openzeppelin-contracts/contracts/access/Ownable.sol";
contract CentipedeERC721 is ERC721, Ownable {
constructor(
string memory name_,
string memory symbol_,
address owner
) ERC721(name_, symbol_) {
transferOwnership(owner);
}
function mint(address to, uint256 tokenId) public onlyOwner {
_safeMint(to, tokenId);
}
}

Wyświetl plik

@ -1,21 +0,0 @@
pragma solidity >0.5.0;
contract Greeter {
string public greeting;
constructor() public {
greeting = 'Hello';
}
function setGreeting(string memory _greeting) public returns (string memory){
greeting = _greeting;
return greeting;
}
function greet() view public returns (string memory) {
return greeting;
}
function pr_greet() view private returns (string memory) {
return greeting;
}
}

Wyświetl plik

@ -1,9 +0,0 @@
#!/usr/bin/env sh
#shitty script that compiles smart contract with solc
#and puts bytecode to ../bytecodes and abi to ../abis folder
tempdir="$(mktemp -d)"
solc --abi --bin $1 -o "$tempdir"
filename=${1%.*}
cp "$tempdir/$filename.bin" "../bytecodes/"
cp "$tempdir/$filename.abi" "../abis/$filename.json"
rm $tempdir -r

@ -1 +0,0 @@
Subproject commit 2b046d79e1a20b6e108e0675f7a08bfd76808993

Wyświetl plik

@ -1,325 +0,0 @@
import logging
import os
from typing import Any, Dict, List, Union
import keyword
import libcst as cst
from web3.types import ABIFunction
from .version import CENTIPEDE_VERSION
CONTRACT_TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "contract.py.template")
try:
with open(CONTRACT_TEMPLATE_PATH, "r") as ifp:
INTERFACE_FILE_TEMPLATE = ifp.read()
except Exception as e:
logging.warn(
f"WARNING: Could not load reporter template from {CONTRACT_TEMPLATE_PATH}:"
)
logging.warn(e)
CLI_TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "cli.py.template")
try:
with open(CLI_TEMPLATE_PATH, "r") as ifp:
CLI_FILE_TEMPLATE = ifp.read()
except Exception as e:
logging.warn(f"WARNING: Could not load reporter template from {CLI_TEMPLATE_PATH}:")
logging.warn(e)
def make_annotation(types: list):
if len(types) == 1:
return cst.Annotation(annotation=cst.Name(types[0]))
union_slice = []
for _type in types:
union_slice.append(
cst.SubscriptElement(
slice=cst.Index(
value=cst.Name(_type),
)
),
)
return cst.Annotation(
annotation=cst.Subscript(value=cst.Name("Union"), slice=union_slice)
)
def normalize_abi_name(name: str) -> str:
if keyword.iskeyword(name):
return name + "_"
else:
return name
def python_type(evm_type: str) -> List[str]:
if evm_type.startswith(("uint", "int")):
return ["int"]
elif evm_type.startswith("bytes"):
return ["bytes"]
elif evm_type == "string":
return ["str"]
elif evm_type == "address":
return ["ChecksumAddress"]
elif evm_type == "bool":
return ["bool"]
else:
raise ValueError(f"Cannot convert to python type {evm_type}")
def generate_contract_class(
abi: List[Dict[str, Any]],
) -> cst.ClassDef:
class_name = "Contract"
class_constructor = cst.FunctionDef(
name=cst.Name("__init__"),
body=cst.IndentedBlock(
body=[
cst.parse_statement("self.web3 = web3"),
cst.parse_statement("self.address = contract_address"),
cst.parse_statement(
"self.contract = web3.eth.contract(address=self.address, abi=CONTRACT_ABI)"
),
]
),
params=cst.Parameters(
params=[
cst.Param(name=cst.Name("self")),
cst.Param(
name=cst.Name("web3"),
annotation=cst.Annotation(annotation=cst.Name("Web3")),
),
cst.Param(
name=cst.Name("contract_address"),
annotation=make_annotation(["Address", "ChecksumAddress"]),
),
]
),
)
contract_constructor = [c for c in abi if c["type"] == "constructor"][0]
contract_constructor["name"] = "constructor"
class_functions = (
[class_constructor]
+ [generate_contract_constructor_function(contract_constructor)]
+ [
generate_contract_function(function)
for function in abi
if function["type"] == "function"
]
)
return cst.ClassDef(
name=cst.Name(class_name), body=cst.IndentedBlock(body=class_functions)
)
def generate_contract_constructor_function(
func_object: Union[Dict[str, Any], int]
) -> cst.FunctionDef:
default_param_name = "arg"
default_counter = 1
func_params = []
param_names = []
for param in func_object["inputs"]:
param_name = normalize_abi_name(param["name"])
if param_name == "":
param_name = f"{default_param_name}{default_counter}"
default_counter += 1
param_type = make_annotation(python_type(param["type"]))
param_names.append(param_name)
func_params.append(
cst.Param(
name=cst.Name(value=param_name),
annotation=param_type,
)
)
func_raw_name = normalize_abi_name(func_object["name"])
func_name = cst.Name(func_raw_name)
proxy_call_code = f"return ContractConstructor({','.join(param_names)})"
func_body = cst.IndentedBlock(body=[cst.parse_statement(proxy_call_code)])
func_returns = cst.Annotation(annotation=cst.Name(value="ContractConstructor"))
return cst.FunctionDef(
name=func_name,
decorators=[cst.Decorator(decorator=cst.Name("staticmethod"))],
params=cst.Parameters(params=func_params),
body=func_body,
returns=func_returns,
)
def generate_contract_function(
func_object: Union[Dict[str, Any], int]
) -> cst.FunctionDef:
default_param_name = "arg"
default_counter = 1
func_params = []
func_params.append(cst.Param(name=cst.Name("self")))
param_names = []
for param in func_object["inputs"]:
param_name = normalize_abi_name(param["name"])
if param_name == "":
param_name = f"{default_param_name}{default_counter}"
default_counter += 1
param_type = make_annotation(python_type(param["type"]))
param_names.append(param_name)
func_params.append(
cst.Param(
name=cst.Name(value=param_name),
annotation=param_type,
)
)
func_raw_name = normalize_abi_name(func_object["name"])
func_name = cst.Name(func_raw_name)
proxy_call_code = (
f"return self.contract.functions.{func_raw_name}({','.join(param_names)})"
)
func_body = cst.IndentedBlock(body=[cst.parse_statement(proxy_call_code)])
func_returns = cst.Annotation(annotation=cst.Name(value="ContractFunction"))
return cst.FunctionDef(
name=func_name,
params=cst.Parameters(params=func_params),
body=func_body,
returns=func_returns,
)
def generate_argument_parser_function(abi: Dict[str, Any]) -> cst.FunctionDef:
def generate_function_subparser(
function_abi: ABIFunction,
description: str,
) -> List[cst.SimpleStatementLine]:
function_name = normalize_abi_name(function_abi["name"])
subparser_init = [
cst.parse_statement(
f'{function_name}_call = call_subcommands.add_parser("{function_name}", description="{description}")'
),
cst.parse_statement(
f'{function_name}_transact = transact_subcommands.add_parser("{function_name}", description="{description}")'
),
]
argument_parsers = []
# TODO(yhtiyar): Functions can have the same name, we will need to ressolve it
default_arg_counter = 1
for arg in function_abi["inputs"]:
arg_name = normalize_abi_name(arg["name"])
if arg_name == "":
arg_name = f"arg{default_arg_counter}"
default_arg_counter += 1
argument_parsers.append(
cst.parse_statement(
f'{function_name}_call.add_argument("{arg_name}", help="Type:{arg["type"]}")'
)
)
argument_parsers.append(
cst.parse_statement(
f'{function_name}_transact.add_argument("{arg_name}", help="Type:{arg["type"]}")'
)
)
return (
subparser_init
+ argument_parsers
+ [
cst.parse_statement(
f"populate_subparser_with_common_args({function_name}_call)"
),
cst.parse_statement(
f"populate_subparser_with_common_args({function_name}_transact)"
),
cst.EmptyLine(),
]
)
parser_init = [
cst.parse_statement(
f'parser = argparse.ArgumentParser(description="Your smart contract cli")'
),
cst.parse_statement(
f'subcommands = parser.add_subparsers(dest="subcommand", required=True)'
),
cst.parse_statement(
f'call = subcommands.add_parser("call",description="Call smart contract function")'
),
cst.parse_statement(
f'call_subcommands = call.add_subparsers(dest="function_name", required=True)'
),
cst.parse_statement(
f'transact = subcommands.add_parser("transact",description="Make transaction to smart contract function")'
),
cst.parse_statement(
f'transact_subcommands = transact.add_subparsers(dest="function_name", required=True)'
),
]
function_abis = [item for item in abi if item["type"] == "function"]
subparsers = []
for function_abi in function_abis:
subparsers.extend(generate_function_subparser(function_abi, "description"))
# Deploy argparser:
contract_constructor = [item for item in abi if item["type"] == "constructor"][0]
deploy_argument_parsers = []
default_arg_counter = 1
for arg in contract_constructor["inputs"]:
arg_name = normalize_abi_name(arg["name"])
if arg_name == "":
arg_name = f"arg{default_arg_counter}"
default_arg_counter += 1
deploy_argument_parsers.append(
cst.parse_statement(
f'deploy.add_argument("{arg_name}", help="Type:{arg["type"]}")'
)
)
deploy_parser = (
[
cst.parse_statement(
'deploy = subcommands.add_parser("deploy", description="Deploy contract")'
)
]
+ deploy_argument_parsers
+ [cst.parse_statement("populate_deploy_subparser(deploy)")]
)
return cst.FunctionDef(
name=cst.Name("generate_argument_parser"),
params=cst.Parameters(),
body=cst.IndentedBlock(
body=parser_init
+ subparsers
+ deploy_parser
+ [cst.parse_statement("return parser")]
),
returns=cst.Annotation(
annotation=cst.Attribute(
value=cst.Name("argparse"), attr=cst.Name("ArgumentParser")
)
),
)
def generate_contract_interface_content(abi: Dict[str, Any], abi_file_name: str) -> str:
contract_body = cst.Module(body=[generate_contract_class(abi)]).code
content = INTERFACE_FILE_TEMPLATE.format(
contract_body=contract_body,
centipede_version=CENTIPEDE_VERSION,
abi_file_name=abi_file_name,
)
return content
def generate_contract_cli_content(abi: Dict[str, Any], abi_file_name: str) -> str:
cli_body = cst.Module(body=[generate_argument_parser_function(abi)]).code
content = CLI_FILE_TEMPLATE.format(
cli_content=cli_body,
centipede_version=CENTIPEDE_VERSION,
abi_file_name=abi_file_name,
)
return content

Wyświetl plik

@ -1,93 +0,0 @@
from typing import Tuple
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from web3 import Web3
from centipede.contracts import ERC1155, ERC20, ERC721, CentipedeContract
from .web3_util import deploy_contract
def _deploy_centipede_token_contract(
web3: Web3,
contract_class: CentipedeContract,
token_name: str,
token_symbol: str,
token_uri: str,
token_owner: ChecksumAddress,
deployer: ChecksumAddress,
deployer_private_key: str,
):
contract_abi = contract_class.abi()
contract_bytecode = contract_class.bytecode()
return deploy_contract(
web3,
contract_bytecode,
contract_abi,
deployer,
deployer_private_key,
[token_name, token_symbol, token_uri, token_owner],
)
def deploy_ERC1155(
web3: Web3,
token_name: str,
token_symbol: str,
token_uri: str,
token_owner: ChecksumAddress,
deployer: ChecksumAddress,
deployer_private_key: str,
) -> Tuple[HexBytes, ChecksumAddress]:
return _deploy_centipede_token_contract(
web3,
ERC1155,
token_name,
token_symbol,
token_uri,
token_owner,
deployer,
deployer_private_key,
)
def deploy_ERC20(
web3: Web3,
token_name: str,
token_symbol: str,
token_owner: ChecksumAddress,
deployer: ChecksumAddress,
deployer_private_key: str,
) -> Tuple[HexBytes, ChecksumAddress]:
contract_abi = ERC20.abi()
contract_bytecode = ERC20.bytecode()
return deploy_contract(
web3,
contract_bytecode,
contract_abi,
deployer,
deployer_private_key,
[token_name, token_symbol, token_owner],
)
def deploy_ERC721(
web3: Web3,
token_name: str,
token_symbol: str,
token_uri: str,
token_owner: ChecksumAddress,
deployer: ChecksumAddress,
deployer_private_key: str,
) -> Tuple[HexBytes, ChecksumAddress]:
return _deploy_centipede_token_contract(
web3,
ERC721,
token_name,
token_symbol,
token_uri,
token_owner,
deployer,
deployer_private_key,
)

Wyświetl plik

@ -1,184 +0,0 @@
import os
from typing import Tuple
import unittest
from eth_typing.evm import ChecksumAddress
from web3 import Web3, EthereumTesterProvider
from ens import ENS
from centipede.manage import deploy_ERC1155
from ..web3_util import (
build_transaction,
decode_transaction_input,
get_nonce,
submit_signed_raw_transaction,
submit_transaction,
wait_for_transaction_receipt,
)
PK = "0x58d23b55bc9cdce1f18c2500f40ff4ab7245df9a89505e9b1fa4851f623d241d"
PK_ADDRESS = "0xdc544d1aa88ff8bbd2f2aec754b1f1e99e1812fd"
def get_web3_test_provider() -> Web3:
return Web3(EthereumTesterProvider())
def airdrop_ether(web3: Web3, to_address: ChecksumAddress):
tx_hash = web3.eth.send_transaction(
{
"from": web3.eth.accounts[0],
"to": to_address,
"value": 100000000,
}
)
web3.eth.wait_for_transaction_receipt(tx_hash)
class CentipedeEthTesterTestCase(unittest.TestCase):
def setUp(self) -> None:
self.basedir = os.path.dirname(os.path.dirname(__file__))
self.web3 = get_web3_test_provider()
self.tester_address = Web3.toChecksumAddress(PK_ADDRESS)
self.tester_address_pk = PK
airdrop_ether(self.web3, self.tester_address)
def check_eth_send(
self,
sender_previous_balance,
receiver_previous_balance,
sender_current_balance,
receiver_current_balance,
send_value,
tx_receipt,
):
assert receiver_current_balance == receiver_previous_balance + send_value
assert (
sender_current_balance
== sender_previous_balance - send_value - tx_receipt["gasUsed"]
)
def test_submit_transaction(self) -> None:
sender = Web3.toChecksumAddress(PK_ADDRESS)
self.web3.eth.send_transaction
receiver = Web3.toChecksumAddress(self.web3.eth.accounts[1])
sender_previous_balance = self.web3.eth.get_balance(sender)
receiver_previous_balance = self.web3.eth.get_balance(receiver)
send_value = 10
transaction = {
"from": sender,
"to": receiver,
"value": send_value,
"nonce": get_nonce(self.web3, sender),
"gasPrice": 1,
}
transaction["gas"] = self.web3.eth.estimate_gas(transaction)
tx_hash = submit_transaction(
self.web3,
transaction,
PK,
)
tx_receipt = wait_for_transaction_receipt(self.web3, tx_hash)
sender_current_balance = self.web3.eth.get_balance(sender)
receiver_current_balance = self.web3.eth.get_balance(receiver)
self.check_eth_send(
sender_previous_balance,
receiver_previous_balance,
sender_current_balance,
receiver_current_balance,
send_value,
tx_receipt,
)
def test_submit_signed_transaction(self) -> None:
sender = Web3.toChecksumAddress(PK_ADDRESS)
self.web3.eth.send_transaction
receiver = Web3.toChecksumAddress(self.web3.eth.accounts[1])
sender_previous_balance = self.web3.eth.get_balance(sender)
receiver_previous_balance = self.web3.eth.get_balance(receiver)
send_value = 10
transaction = {
"from": sender,
"to": receiver,
"value": send_value,
"nonce": get_nonce(self.web3, sender),
"gasPrice": 1,
}
transaction["gas"] = self.web3.eth.estimate_gas(transaction)
signed_transaction = self.web3.eth.account.sign_transaction(
transaction, private_key=PK
)
tx_hash = submit_signed_raw_transaction(
self.web3, signed_transaction.rawTransaction
)
tx_receipt = wait_for_transaction_receipt(self.web3, tx_hash)
sender_current_balance = self.web3.eth.get_balance(sender)
receiver_current_balance = self.web3.eth.get_balance(receiver)
self.check_eth_send(
sender_previous_balance,
receiver_previous_balance,
sender_current_balance,
receiver_current_balance,
send_value,
tx_receipt,
)
def test_deploy_erc1155(self):
TOKEN_NAME = "CENTIPEDE-TEST"
TOKEN_SYMBOL = "CNTPD"
TOKEN_URI = "moonstream.to/centipede/"
_, contract_address = deploy_ERC1155(
self.web3,
TOKEN_NAME,
TOKEN_SYMBOL,
TOKEN_URI,
self.tester_address,
self.tester_address,
self.tester_address_pk,
)
base_dir = self.basedir
contract_abi_path = os.path.join(base_dir, "fixture/abis/CentipedeERC1155.json")
with open(contract_abi_path, "r") as ifp:
contract_abi = ifp.read()
contract = self.web3.eth.contract(contract_address, abi=contract_abi)
assert (
contract.functions["name"]().call() == TOKEN_NAME
), "Token name in blockchain != set token name while deploying"
assert (
contract.functions["symbol"]().call() == TOKEN_SYMBOL
), "Token name in blockchain != set token symbol while deploying"
transaction = build_transaction(
self.web3, contract.functions["create"]("1", b""), self.tester_address
)
tx_hash = submit_transaction(self.web3, transaction, self.tester_address_pk)
wait_for_transaction_receipt(self.web3, tx_hash)
assert (
contract.functions["uri"](1).call() == TOKEN_URI + "1"
), "Token with id 1 is not created or has different uri from that is expected"
def test_decode_tx_input(self):
base_dir = self.basedir
contract_abi_path = os.path.join(base_dir, "fixture/abis/CentipedeERC1155.json")
with open(contract_abi_path, "r") as ifp:
contract_abi = ifp.read()
tx_input = "0xf242432a0000000000000000000000004f9a8e7dddee5f9737bafad382fa3bb119fc80c4000000000000000000000000c2485a4a8fbabbb7c39fe7b459816f2f16c238840000000000000000000000000000000000000000000000000000000000000378000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000"
decode_transaction_input(self.web3, tx_input, contract_abi)
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -1,62 +0,0 @@
from typing import Tuple
import os
import unittest
from unittest.case import TestCase
from web3 import Web3
from eth_typing.evm import ChecksumAddress
from ..manage import deploy_ERC1155
def read_testnet_env_variables() -> Tuple[Web3, ChecksumAddress, str]:
provider_path = os.environ.get("CENTIPEDE_TESTNET_PATH")
if provider_path is None:
raise ValueError("CENTIPEDE_TESTNET_PATH env variable is not set")
raw_address = os.environ.get("CENTIPEDE_TEST_ETHEREUM_ADDRESS")
if raw_address is None:
raise ValueError("CENTIPEDE_TEST_ETHEREUM_ADDRESS env variable is not set")
private_key = os.environ.get("CENTIPEDE_TEST_ETHEREUM_ADDRESS_PRIVATE_KEY")
if raw_address is None:
raise ValueError(
"CENTIPEDE_TEST_ETHEREUM_ADDRESS_PRIVATE_KEY env variable is not set"
)
return (
Web3(Web3.HTTPProvider(provider_path)),
Web3.toChecksumAddress(raw_address),
private_key,
)
class CentipedeTestnetTestCase(TestCase):
def setUp(self) -> None:
self.basedir = os.path.dirname(os.path.dirname(__file__))
try:
(
self.web3,
self.test_address,
self.test_address_pk,
) = read_testnet_env_variables()
except Exception as e:
raise unittest.SkipTest(f"Skipping test because of : {str(e)}")
def _deploy_contract(self) -> ChecksumAddress:
TOKEN_NAME = "CENTIPEDE-TEST"
TOKEN_SYMBOL = "CNTPD"
TOKEN_URI = "moonstream.to/centipede/"
_, contract_address = deploy_ERC1155(
self.web3,
TOKEN_NAME,
TOKEN_SYMBOL,
TOKEN_URI,
self.test_address,
self.test_address,
self.test_address_pk,
)
return contract_address
def test_deployment(self) -> None:
contract_address = self._deploy_contract()
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -1 +0,0 @@
CENTIPEDE_VERSION = "0.0.1"

Wyświetl plik

@ -1,167 +0,0 @@
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple
import os
import getpass
from eth_account.account import Account
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from web3 import Web3, eth
from web3.contract import Contract, ContractFunction
from web3.types import Nonce, TxParams, TxReceipt, Wei
class ContractConstructor:
def __init__(self, *args: Any):
self.args = args
def build_transaction(
web3: Web3,
builder: ContractFunction,
sender: ChecksumAddress,
) -> Dict[str, Any]:
"""
Builds transaction json with the given arguments. It is not submitting transaction
Arguments:
- web3: Web3 client
- builder: ContractFunction or other class that has method buildTransaction(TxParams)
- sender: `from` value of transaction, address which is sending this transaction
- maxFeePerGas: Optional, max priority fee for dynamic fee transactions in Wei
- maxPriorityFeePerGas: Optional the part of the fee that goes to the miner
"""
transaction = builder.buildTransaction(
{
"from": sender,
"nonce": get_nonce(web3, sender),
}
)
return transaction
def get_nonce(web3: Web3, address: ChecksumAddress) -> Nonce:
"""
Returns Nonce: number of transactions for given address
"""
nonce = web3.eth.get_transaction_count(address)
return nonce
def submit_transaction(
web3: Web3, transaction: Dict[str, Any], signer_private_key: str
) -> HexBytes:
"""
Signs and submits json transaction to blockchain from the name of signer
"""
signed_transaction = web3.eth.account.sign_transaction(
transaction, private_key=signer_private_key
)
return submit_signed_raw_transaction(web3, signed_transaction.rawTransaction)
def submit_signed_raw_transaction(
web3: Web3, signed_raw_transaction: HexBytes
) -> HexBytes:
"""
Submits already signed raw transaction.
"""
transaction_hash = web3.eth.send_raw_transaction(signed_raw_transaction)
return transaction_hash
def wait_for_transaction_receipt(web3: Web3, transaction_hash: HexBytes):
return web3.eth.wait_for_transaction_receipt(transaction_hash)
def deploy_contract(
web3: Web3,
contract_bytecode: str,
contract_abi: Dict[str, Any],
deployer: ChecksumAddress,
deployer_private_key: str,
constructor_arguments: Optional[List[Any]] = None,
) -> Tuple[HexBytes, ChecksumAddress]:
"""
Deploys smart contract to blockchain
Arguments:
- web3: web3 client
- contract_bytecode: Compiled smart contract bytecode
- contract_abi: Json abi of contract. Must include `constructor` function
- deployer: Address which is deploying contract. Deployer will pay transaction fee
- deployer_private_key: Private key of deployer. Needed for signing and submitting transaction
- constructor_arguments: arguments that are passed to `constructor` function of the smart contract
"""
contract = web3.eth.contract(abi=contract_abi, bytecode=contract_bytecode)
transaction = build_transaction(
web3, contract.constructor(*constructor_arguments), deployer
)
transaction_hash = submit_transaction(web3, transaction, deployer_private_key)
transaction_receipt = wait_for_transaction_receipt(web3, transaction_hash)
contract_address = transaction_receipt.contractAddress
return transaction_hash, web3.toChecksumAddress(contract_address)
def deploy_contract_from_constructor_function(
web3: Web3,
contract_bytecode: str,
contract_abi: Dict[str, Any],
deployer: ChecksumAddress,
deployer_private_key: str,
constructor: ContractConstructor,
) -> Tuple[HexBytes, ChecksumAddress]:
"""
Deploys smart contract to blockchain from constructor ContractFunction
Arguments:
- web3: web3 client
- contract_bytecode: Compiled smart contract bytecode
- contract_abi: Json abi of contract. Must include `constructor` function
- deployer: Address which is deploying contract. Deployer will pay transaction fee
- deployer_private_key: Private key of deployer. Needed for signing and submitting transaction
- constructor:`constructor` function of the smart contract
"""
contract = web3.eth.contract(abi=contract_abi, bytecode=contract_bytecode)
transaction = build_transaction(
web3, contract.constructor(*constructor.args), deployer
)
transaction_hash = submit_transaction(web3, transaction, deployer_private_key)
transaction_receipt = wait_for_transaction_receipt(web3, transaction_hash)
contract_address = transaction_receipt.contractAddress
return transaction_hash, web3.toChecksumAddress(contract_address)
def decode_transaction_input(web3: Web3, transaction_input: str, abi: Dict[str, Any]):
contract = web3.eth.contract(abi=abi)
return contract.decode_function_input(transaction_input)
def read_keys_from_cli() -> Tuple[ChecksumAddress, str]:
private_key = getpass.getpass(prompt="Enter private key of your address:")
account = Account.from_key(private_key)
return (Web3.toChecksumAddress(account.address), private_key)
def read_keys_from_env() -> Tuple[ChecksumAddress, str]:
private_key = os.environ.get("CENTIPEDE_ETHEREUM_ADDRESS_PRIVATE_KEY")
if private_key is None:
raise ValueError(
"CENTIPEDE_ETHEREUM_ADDRESS_PRIVATE_KEY env variable is not set"
)
account = Account.from_key(private_key)
return (Web3.toChecksumAddress(account), private_key)
def cast_to_python_type(evm_type: str) -> Callable:
if evm_type.startswith(("uint", "int")):
return int
elif evm_type.startswith("bytes"):
return bytes
elif evm_type == "string":
return str
elif evm_type == "address":
return Web3.toChecksumAddress
elif evm_type == "bool":
return bool
else:
raise ValueError(f"Cannot convert to python type {evm_type}")