esp-idf/tools/test_apps/system/panic/test_panic_util/test_panic_util.py

280 wiersze
11 KiB
Python

import logging
import os
from pygdbmi.gdbcontroller import GdbController
import re
import subprocess
import sys
import ttfw_idf
from tiny_test_fw import Utility, TinyFW, DUT
from tiny_test_fw.Utility import SearchCases, CaseConfig
# hard-coded to the path one level above - only intended to be used from the panic test app
TEST_PATH = os.path.relpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."), os.getenv("IDF_PATH"))
TEST_SUITE = "Panic"
def ok(data):
""" Helper function used with dut.expect_any """
pass
def unexpected(data):
""" Helper function used with dut.expect_any """
raise AssertionError("Unexpected: {}".format(data))
class PanicTestApp(ttfw_idf.TestApp):
pass
class PanicTestMixin(object):
""" Provides custom functionality for the panic test DUT """
BOOT_CMD_ADDR = 0x9000
BOOT_CMD_SIZE = 0x1000
DEFAULT_EXPECT_TIMEOUT = 10
COREDUMP_UART_START = "================= CORE DUMP START ================="
COREDUMP_UART_END = "================= CORE DUMP END ================="
def start_test(self, test_name):
""" Starts the app and sends it the test name """
self.test_name = test_name
# Start the app and verify that it has started up correctly
self.start_capture_raw_data()
self.start_app()
self.expect("Enter test name: ")
Utility.console_log("Setting boot command: " + test_name)
self.write(test_name)
self.expect("Got test name: " + test_name)
def expect_none(self, *patterns, **timeout_args):
""" like dut.expect_all, but with an inverse logic """
found_data = []
if "timeout" not in timeout_args:
timeout_args["timeout"] = 1
def found(data):
raise AssertionError("Unexpected: {}".format(data))
found_data.append(data)
try:
expect_items = [(pattern, found) for pattern in patterns]
self.expect_any(*expect_items, **timeout_args)
raise AssertionError("Unexpected: {}".format(found_data))
except DUT.ExpectTimeout:
return True
def expect_gme(self, reason):
""" Expect method for Guru Meditation Errors """
self.expect(r"Guru Meditation Error: Core 0 panic'ed (%s)" % reason)
def expect_reg_dump(self, core=0):
""" Expect method for the register dump """
self.expect(re.compile(r"Core\s+%d register dump:" % core))
def expect_elf_sha256(self):
""" Expect method for ELF SHA256 line """
elf_sha256 = self.app.get_elf_sha256()
sdkconfig = self.app.get_sdkconfig()
elf_sha256_len = int(sdkconfig.get("CONFIG_APP_RETRIEVE_LEN_ELF_SHA", "16"))
self.expect("ELF file SHA256: " + elf_sha256[0:elf_sha256_len])
def expect_backtrace(self):
self.expect("Backtrace:")
self.expect_none("CORRUPTED")
def __enter__(self):
self._raw_data = None
self.gdb = None
return self
def __exit__(self, type, value, traceback):
log_folder = self.app.get_log_folder(TEST_SUITE)
with open(os.path.join(log_folder, "log_" + self.test_name + ".txt"), "w") as log_file:
Utility.console_log("Writing output of {} to {}".format(self.test_name, log_file.name))
log_file.write(self.get_raw_data())
if self.gdb:
self.gdb.exit()
self.close()
def get_raw_data(self):
if not self._raw_data:
self._raw_data = self.stop_capture_raw_data()
return self._raw_data
def _call_espcoredump(self, extra_args, coredump_file_name, output_file_name):
# no "with" here, since we need the file to be open for later inspection by the test case
self.coredump_output = open(output_file_name, "w")
espcoredump_script = os.path.join(os.environ["IDF_PATH"], "components", "espcoredump", "espcoredump.py")
espcoredump_args = [
sys.executable,
espcoredump_script,
"info_corefile",
"--core", coredump_file_name,
]
espcoredump_args += extra_args
espcoredump_args.append(self.app.elf_file)
Utility.console_log("Running " + " ".join(espcoredump_args))
Utility.console_log("espcoredump output is written to " + self.coredump_output.name)
subprocess.check_call(espcoredump_args, stdout=self.coredump_output)
self.coredump_output.flush()
self.coredump_output.seek(0)
def process_coredump_uart(self):
""" Extract the core dump from UART output of the test, run espcoredump on it """
log_folder = self.app.get_log_folder(TEST_SUITE)
data = self.get_raw_data()
coredump_start = data.find(self.COREDUMP_UART_START)
coredump_end = data.find(self.COREDUMP_UART_END)
coredump_base64 = data[coredump_start + len(self.COREDUMP_UART_START):coredump_end]
with open(os.path.join(log_folder, "coredump_data_" + self.test_name + ".b64"), "w") as coredump_file:
Utility.console_log("Writing UART base64 core dump to " + coredump_file.name)
coredump_file.write(coredump_base64)
output_file_name = os.path.join(log_folder, "coredump_uart_result_" + self.test_name + ".txt")
self._call_espcoredump(["--core-format", "b64"], coredump_file.name, output_file_name)
def process_coredump_flash(self):
""" Extract the core dump from flash, run espcoredump on it """
log_folder = self.app.get_log_folder(TEST_SUITE)
coredump_file_name = os.path.join(log_folder, "coredump_data_" + self.test_name + ".bin")
Utility.console_log("Writing flash binary core dump to " + coredump_file_name)
self.dump_flush(coredump_file_name, partition="coredump")
output_file_name = os.path.join(log_folder, "coredump_flash_result_" + self.test_name + ".txt")
self._call_espcoredump(["--core-format", "raw"], coredump_file_name, output_file_name)
def start_gdb(self):
"""
Runs GDB and connects it to the "serial" port of the DUT.
After this, the DUT expect methods can no longer be used to capture output.
"""
self.stop_receive()
self._port_close()
Utility.console_log("Starting GDB...", "orange")
self.gdb = GdbController(gdb_path=self.TOOLCHAIN_PREFIX + "gdb")
# pygdbmi logs to console by default, make it log to a file instead
log_folder = self.app.get_log_folder(TEST_SUITE)
pygdbmi_log_file_name = os.path.join(log_folder, "pygdbmi_log_" + self.test_name + ".txt")
pygdbmi_logger = self.gdb.logger
pygdbmi_logger.setLevel(logging.DEBUG)
while pygdbmi_logger.hasHandlers():
pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
log_handler = logging.FileHandler(pygdbmi_log_file_name)
log_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
pygdbmi_logger.addHandler(log_handler)
# Set up logging for GDB remote protocol
gdb_remotelog_file_name = os.path.join(log_folder, "gdb_remote_log_" + self.test_name + ".txt")
self.gdb.write("-gdb-set remotelogfile " + gdb_remotelog_file_name)
# Load the ELF file
self.gdb.write("-file-exec-and-symbols {}".format(self.app.elf_file))
# Connect GDB to UART
Utility.console_log("Connecting to GDB Stub...", "orange")
self.gdb.write("-gdb-set serial baud 115200")
responses = self.gdb.write("-target-select remote " + self.get_gdb_remote(), timeout_sec=3)
# Make sure we get the 'stopped' notification
stop_response = self.find_gdb_response('stopped', 'notify', responses)
if not stop_response:
responses = self.gdb.write("-exec-interrupt", timeout_sec=3)
stop_response = self.find_gdb_response('stopped', 'notify', responses)
assert stop_response
frame = stop_response["payload"]["frame"]
if "file" not in frame:
frame["file"] = "?"
if "line" not in frame:
frame["line"] = "?"
Utility.console_log("Stopped in {func} at {addr} ({file}:{line})".format(**frame), "orange")
# Drain remaining responses
self.gdb.get_gdb_response(raise_error_on_timeout=False)
def gdb_backtrace(self):
"""
Returns the list of stack frames for the current thread.
Each frame is a dictionary, refer to pygdbmi docs for the format.
"""
assert self.gdb
responses = self.gdb.write("-stack-list-frames", timeout_sec=3)
return self.find_gdb_response("done", "result", responses)["payload"]["stack"]
@staticmethod
def match_backtrace(gdb_backtrace, expected_functions_list):
"""
Returns True if the function names listed in expected_functions_list match the backtrace
given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
function.
"""
return all([frame["func"] == expected_functions_list[i] for i, frame in enumerate(gdb_backtrace)])
@staticmethod
def find_gdb_response(message, response_type, responses):
"""
Helper function which extracts one response from an array of GDB responses, filtering
by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.
"""
def match_response(response):
return (response["message"] == message and
response["type"] == response_type)
filtered_responses = [r for r in responses if match_response(r)]
if not filtered_responses:
return None
return filtered_responses[0]
class ESP32PanicTestDUT(ttfw_idf.ESP32DUT, PanicTestMixin):
def get_gdb_remote(self):
return self.port
def panic_test(**kwargs):
""" Decorator for the panic tests, sets correct App and DUT classes """
return ttfw_idf.idf_custom_test(app=PanicTestApp, dut=ESP32PanicTestDUT, env_tag="test_jtag_arm", **kwargs)
def get_dut(env, app_config_name, test_name, qemu_wdt_enable=False):
dut = env.get_dut("panic", TEST_PATH, app_config_name=app_config_name, allow_dut_exception=True)
dut.qemu_wdt_enable = qemu_wdt_enable
""" Wrapper for getting the DUT and starting the test """
dut.start_test(test_name)
return dut
def run_all(filename, case_filter=[]):
""" Helper function to run test cases defined in a file; to be called from __main__.
case_filter is an optional list of case names to run.
If not specified, all test cases are run.
"""
TinyFW.set_default_config(env_config_file=None, test_suite_name=TEST_SUITE)
test_methods = SearchCases.Search.search_test_cases(filename)
test_methods = filter(lambda m: not m.case_info["ignore"], test_methods)
test_cases = CaseConfig.Parser.apply_config(test_methods, None)
tests_failed = []
for case in test_cases:
test_name = case.test_method.__name__
if case_filter:
if case_filter[0].endswith("*"):
if not test_name.startswith(case_filter[0][:-1]):
continue
else:
if test_name not in case_filter:
continue
result = case.run()
if not result:
tests_failed.append(case)
if tests_failed:
print("The following tests have failed:")
for case in tests_failed:
print(" - " + case.test_method.__name__)
raise SystemExit(1)
print("Tests pass")