moonstream/moonstreamapi/moonstreamapi/abi_decoder.py

116 wiersze
3.6 KiB
Python
Czysty Zwykły widok Historia

2021-07-28 17:55:17 +00:00
import argparse
2021-07-28 16:59:13 +00:00
import binascii
2021-07-28 17:55:17 +00:00
import sys
2021-12-16 13:26:04 +00:00
from typing import List, Optional, Type, Union, cast
2021-07-28 17:55:17 +00:00
import pyevmasm
2021-07-28 16:59:13 +00:00
from moonstreamdb.models import ESDEventSignature, ESDFunctionSignature
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import text
2021-12-16 13:26:04 +00:00
from moonstreamdb.db import yield_db_session
from .data import ContractABI, EVMEventSignature, EVMFunctionSignature
2021-07-28 16:59:13 +00:00
def query_for_text_signatures(
session: Session,
hex_signature: str,
db_model: Union[ESDFunctionSignature, ESDEventSignature],
) -> List[str]:
2021-07-28 16:59:13 +00:00
query = session.query(db_model)
query = query.filter(db_model.hex_signature == hex_signature)
results = query.all()
text_signatures = []
for el in results:
text_signatures.append(el.text_signature)
return text_signatures
2021-07-28 17:22:16 +00:00
2021-07-28 16:59:13 +00:00
def decode_signatures(
2021-07-28 17:22:16 +00:00
session: Session,
hex_signatures: List[str],
data_model: Union[Type[EVMEventSignature], Type[EVMFunctionSignature]],
db_model: Union[ESDEventSignature, ESDFunctionSignature],
2021-07-28 16:59:13 +00:00
) -> List[Union[EVMEventSignature, EVMFunctionSignature]]:
decoded_signatures = []
for hex_signature in hex_signatures:
signature = data_model(hex_signature=hex_signature)
signature.text_signature_candidates = query_for_text_signatures(
session, hex_signature, db_model
)
decoded_signatures.append(signature)
return decoded_signatures
2021-07-28 17:22:16 +00:00
2021-07-28 17:55:17 +00:00
def decode_abi(source: str, session: Optional[Session] = None) -> ContractABI:
normalized_source = source
if normalized_source[:2] == "0x":
normalized_source = normalized_source[2:]
disassembled = pyevmasm.disassemble_all(binascii.unhexlify(normalized_source))
2021-07-28 16:59:13 +00:00
function_hex_signatures = []
event_hex_signatures = []
2021-07-28 17:55:17 +00:00
should_close_session = False
if session is None:
should_close_session = True
session = next(yield_db_session())
2021-07-28 16:59:13 +00:00
for instruction in disassembled:
if instruction.name == "PUSH4":
hex_signature = "0x{:x}".format(instruction.operand)
if hex_signature not in function_hex_signatures:
function_hex_signatures.append(hex_signature)
elif instruction.name == "PUSH32":
hex_signature = "0x{:x}".format(instruction.operand)
if hex_signature not in event_hex_signatures:
event_hex_signatures.append(hex_signature)
2021-07-28 17:55:17 +00:00
try:
2021-07-28 17:22:16 +00:00
function_signatures = decode_signatures(
session, function_hex_signatures, EVMFunctionSignature, ESDFunctionSignature
)
event_signatures = decode_signatures(
session, event_hex_signatures, EVMEventSignature, ESDEventSignature
)
2021-07-28 17:55:17 +00:00
finally:
if should_close_session:
session.close()
2021-07-28 17:22:16 +00:00
2021-07-28 17:55:17 +00:00
abi = ContractABI(
2023-03-07 14:54:05 +00:00
functions=[
cast(EVMFunctionSignature, function_signature)
for function_signature in function_signatures
],
events=[
cast(EVMEventSignature, event_signature)
for event_signature in event_signatures
],
2021-07-28 17:55:17 +00:00
)
return abi
def main() -> None:
parser = argparse.ArgumentParser(description="Decode Ethereum smart contract ABIs")
parser.add_argument(
"-i",
"--infile",
type=argparse.FileType("r"),
default=sys.stdin,
help="File containing the ABI to decode",
)
args = parser.parse_args()
source: Optional[str] = None
with args.infile as ifp:
source = ifp.read().strip()
if source is None:
raise ValueError("Could not read ABI.")
2021-07-28 17:55:17 +00:00
abi = decode_abi(source)
print(abi.json())
if __name__ == "__main__":
main()