From 39319c07501e1b60b55a506ae05e492f0fae5203 Mon Sep 17 00:00:00 2001 From: Ivan Grokhotkov Date: Mon, 3 Oct 2022 21:44:28 +0200 Subject: [PATCH] test_apps: move PanicTestDut from conftest into a separate module pytest_panic used to do 'from conftest import PanicTestDut'. This stopped working when another conftest.py file was added in tools/test_build_system/conftest.py. In this case we can't rename conftest.py since both are necessary for pytest to install the hooks in the respective test cases. Fix by moving PanicTestDut into a separate module, then importing it from there. --- tools/test_apps/system/panic/conftest.py | 271 +---------------- tools/test_apps/system/panic/pytest_panic.py | 3 +- .../system/panic/test_panic_util/__init__.py | 5 + .../system/panic/test_panic_util/panic_dut.py | 272 ++++++++++++++++++ 4 files changed, 279 insertions(+), 272 deletions(-) create mode 100644 tools/test_apps/system/panic/test_panic_util/__init__.py create mode 100644 tools/test_apps/system/panic/test_panic_util/panic_dut.py diff --git a/tools/test_apps/system/panic/conftest.py b/tools/test_apps/system/panic/conftest.py index e688deb9c7..dc43e48c82 100644 --- a/tools/test_apps/system/panic/conftest.py +++ b/tools/test_apps/system/panic/conftest.py @@ -3,279 +3,10 @@ # pylint: disable=W0621 # redefined-outer-name -import logging -import os -import subprocess -import sys -from typing import Any, Dict, List, TextIO - -import pexpect import pytest from _pytest.fixtures import FixtureRequest from _pytest.monkeypatch import MonkeyPatch -from panic_utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess -from pygdbmi.gdbcontroller import GdbController -from pytest_embedded_idf.app import IdfApp -from pytest_embedded_idf.dut import IdfDut -from pytest_embedded_idf.serial import IdfSerial - - -class PanicTestDut(IdfDut): - BOOT_CMD_ADDR = 0x9000 - BOOT_CMD_SIZE = 0x1000 - DEFAULT_EXPECT_TIMEOUT = 10 - COREDUMP_UART_START = '================= CORE DUMP START =================' - COREDUMP_UART_END = '================= CORE DUMP END =================' - - app: IdfApp - serial: IdfSerial - - def __init__(self, *args, **kwargs) -> None: # type: ignore - super().__init__(*args, **kwargs) - - self.gdb: GdbController = None # type: ignore - # record this since pygdbmi is using logging.debug to generate some single character mess - self.log_level = logging.getLogger().level - # pygdbmi is using logging.debug to generate some single character mess - if self.log_level <= logging.DEBUG: - logging.getLogger().setLevel(logging.INFO) - - self.coredump_output: TextIO = None # type: ignore - - def close(self) -> None: - if self.gdb: - self.gdb.exit() - - super().close() - - def revert_log_level(self) -> None: - logging.getLogger().setLevel(self.log_level) - - def expect_test_func_name(self, test_func_name: str) -> None: - self.expect_exact('Enter test name:') - self.write(test_func_name) - self.expect_exact('Got test name: ' + test_func_name) - - def expect_none(self, pattern, **kwargs) -> None: # type: ignore - """like dut.expect_all, but with an inverse logic""" - if 'timeout' not in kwargs: - kwargs['timeout'] = 1 - - try: - res = self.expect(pattern, **kwargs) - raise AssertionError(f'Unexpected: {res.group().decode("utf8")}') - except pexpect.TIMEOUT: - pass - - def expect_backtrace(self) -> None: - self.expect_exact('Backtrace:') - self.expect_none('CORRUPTED') - - def expect_gme(self, reason: str) -> None: - """Expect method for Guru Meditation Errors""" - self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})") - - def expect_reg_dump(self, core: int = 0) -> None: - """Expect method for the register dump""" - self.expect(r'Core\s+%d register dump:' % core) - - def expect_elf_sha256(self) -> None: - """Expect method for ELF SHA256 line""" - elf_sha256 = sha256(self.app.elf_file) - elf_sha256_len = int( - self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16') - ) - self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len]) - - def _call_espcoredump( - self, extra_args: List[str], coredump_file_name: str, output_file_name: str - ) -> None: - # no "with" here, since we need the file to be open for later inspection by the test case - if not self.coredump_output: - self.coredump_output = open(output_file_name, 'w') - - espcoredump_script = os.path.join( - os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py' - ) - espcoredump_args = [ - sys.executable, - espcoredump_script, - 'info_corefile', - '--core', - coredump_file_name, - ] - espcoredump_args += extra_args - espcoredump_args.append(self.app.elf_file) - logging.info('Running %s', ' '.join(espcoredump_args)) - logging.info('espcoredump output is written to %s', self.coredump_output.name) - - subprocess.check_call(espcoredump_args, stdout=self.coredump_output) - self.coredump_output.flush() - self.coredump_output.seek(0) - - def process_coredump_uart(self) -> None: - """Extract the core dump from UART output of the test, run espcoredump on it""" - self.expect(self.COREDUMP_UART_START) - res = self.expect('(.+)' + self.COREDUMP_UART_END) - coredump_base64 = res.group(1).decode('utf8') - with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file: - logging.info('Writing UART base64 core dump to %s', coredump_file.name) - coredump_file.write(coredump_base64) - - output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt') - self._call_espcoredump( - ['--core-format', 'b64'], coredump_file.name, output_file_name - ) - - def process_coredump_flash(self) -> None: - """Extract the core dump from flash, run espcoredump on it""" - coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin') - logging.info('Writing flash binary core dump to %s', coredump_file_name) - self.serial.dump_flash(partition='coredump', output=coredump_file_name) - - output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt') - self._call_espcoredump( - ['--core-format', 'raw'], coredump_file_name, output_file_name - ) - - def gdb_write(self, command: str) -> Any: - """ - Wrapper to write to gdb with a longer timeout, as test runner - host can be slow sometimes - """ - return self.gdb.write(command, timeout_sec=10) - - def start_gdb(self) -> None: - """ - Runs GDB and connects it to the "serial" port of the DUT. - After this, the DUT expect methods can no longer be used to capture output. - """ - gdb_path = self.toolchain_prefix + 'gdb' - try: - from pygdbmi.constants import GdbTimeoutError - default_gdb_args = ['--nx', '--quiet', '--interpreter=mi2'] - gdb_command = [gdb_path] + default_gdb_args - self.gdb = GdbController(command=gdb_command) - pygdbmi_logger = attach_logger() - except ImportError: - # fallback for pygdbmi<0.10.0.0. - from pygdbmi.gdbcontroller import GdbTimeoutError - self.gdb = GdbController(gdb_path=gdb_path) - pygdbmi_logger = self.gdb.logger - - # pygdbmi logs to console by default, make it log to a file instead - pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt') - pygdbmi_logger.setLevel(logging.DEBUG) - while pygdbmi_logger.hasHandlers(): - pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0]) - log_handler = logging.FileHandler(pygdbmi_log_file_name) - log_handler.setFormatter( - logging.Formatter('%(asctime)s %(levelname)s: %(message)s') - ) - pygdbmi_logger.addHandler(log_handler) - try: - gdb_command = self.gdb.command - except AttributeError: - # fallback for pygdbmi < 0.10 - gdb_command = self.gdb.cmd - - logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"') - for _ in range(10): - try: - # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that - # an RPI under high load will get non-responsive during creating a lot of processes. - if not hasattr(self.gdb, 'verify_valid_gdb_subprocess'): - # for pygdbmi >= 0.10.0.0 - verify_valid_gdb_subprocess(self.gdb.gdb_process) - resp = self.gdb.get_gdb_response( - timeout_sec=10 - ) # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0 - # it will be interesting to look up this response if the next GDB command fails (times out) - logging.info('GDB response: %s', resp) - break # success - except GdbTimeoutError: - logging.warning( - 'GDB internal error: cannot get response from the subprocess' - ) - except NoGdbProcessError: - logging.error('GDB internal error: process is not running') - break # failure - TODO: create another GdbController - except ValueError: - logging.error( - 'GDB internal error: select() returned an unexpected file number' - ) - - # Set up logging for GDB remote protocol - gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt') - self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name) - - # Load the ELF file - self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file)) - - # Connect GDB to UART - self.serial.proc.close() - logging.info('Connecting to GDB Stub...') - self.gdb_write('-gdb-set serial baud 115200') - responses = self.gdb_write('-target-select remote ' + self.serial.port) - - # Make sure we get the 'stopped' notification - stop_response = self.find_gdb_response('stopped', 'notify', responses) - if not stop_response: - responses = self.gdb_write('-exec-interrupt') - stop_response = self.find_gdb_response('stopped', 'notify', responses) - assert stop_response - frame = stop_response['payload']['frame'] - if 'file' not in frame: - frame['file'] = '?' - if 'line' not in frame: - frame['line'] = '?' - logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame)) - - # Drain remaining responses - self.gdb.get_gdb_response(raise_error_on_timeout=False) - - def gdb_backtrace(self) -> Any: - """ - Returns the list of stack frames for the current thread. - Each frame is a dictionary, refer to pygdbmi docs for the format. - """ - assert self.gdb - - responses = self.gdb_write('-stack-list-frames') - return self.find_gdb_response('done', 'result', responses)['payload']['stack'] - - @staticmethod - def match_backtrace( - gdb_backtrace: List[Any], expected_functions_list: List[Any] - ) -> bool: - """ - Returns True if the function names listed in expected_functions_list match the backtrace - given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace() - function. - """ - return all( - [ - frame['func'] == expected_functions_list[i] - for i, frame in enumerate(gdb_backtrace) - ] - ) - - @staticmethod - def find_gdb_response( - message: str, response_type: str, responses: List[Any] - ) -> Any: - """ - Helper function which extracts one response from an array of GDB responses, filtering - by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format. - """ - - def match_response(response: Dict[str, Any]) -> bool: - return response['message'] == message and response['type'] == response_type # type: ignore - - filtered_responses = [r for r in responses if match_response(r)] - if not filtered_responses: - return None - return filtered_responses[0] +from test_panic_util import PanicTestDut @pytest.fixture(scope='module') diff --git a/tools/test_apps/system/panic/pytest_panic.py b/tools/test_apps/system/panic/pytest_panic.py index 76f9ff5cc8..f60f384eaf 100644 --- a/tools/test_apps/system/panic/pytest_panic.py +++ b/tools/test_apps/system/panic/pytest_panic.py @@ -6,8 +6,7 @@ from pprint import pformat from typing import List, Optional import pytest - -from conftest import PanicTestDut +from test_panic_util import PanicTestDut CONFIGS = [ pytest.param('coredump_flash_bin_crc', marks=[pytest.mark.esp32, pytest.mark.esp32s2]), diff --git a/tools/test_apps/system/panic/test_panic_util/__init__.py b/tools/test_apps/system/panic/test_panic_util/__init__.py new file mode 100644 index 0000000000..8079526734 --- /dev/null +++ b/tools/test_apps/system/panic/test_panic_util/__init__.py @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +from .panic_dut import PanicTestDut + +__all__ = ['PanicTestDut'] diff --git a/tools/test_apps/system/panic/test_panic_util/panic_dut.py b/tools/test_apps/system/panic/test_panic_util/panic_dut.py new file mode 100644 index 0000000000..e956e0ec5f --- /dev/null +++ b/tools/test_apps/system/panic/test_panic_util/panic_dut.py @@ -0,0 +1,272 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +import logging +import os +import subprocess +import sys +from typing import Any, Dict, List, TextIO + +import pexpect +from panic_utils import NoGdbProcessError, attach_logger, quote_string, sha256, verify_valid_gdb_subprocess +from pygdbmi.gdbcontroller import GdbController +from pytest_embedded_idf.app import IdfApp +from pytest_embedded_idf.dut import IdfDut +from pytest_embedded_idf.serial import IdfSerial + + +class PanicTestDut(IdfDut): + BOOT_CMD_ADDR = 0x9000 + BOOT_CMD_SIZE = 0x1000 + DEFAULT_EXPECT_TIMEOUT = 10 + COREDUMP_UART_START = '================= CORE DUMP START =================' + COREDUMP_UART_END = '================= CORE DUMP END =================' + + app: IdfApp + serial: IdfSerial + + def __init__(self, *args, **kwargs) -> None: # type: ignore + super().__init__(*args, **kwargs) + + self.gdb: GdbController = None # type: ignore + # record this since pygdbmi is using logging.debug to generate some single character mess + self.log_level = logging.getLogger().level + # pygdbmi is using logging.debug to generate some single character mess + if self.log_level <= logging.DEBUG: + logging.getLogger().setLevel(logging.INFO) + + self.coredump_output: TextIO = None # type: ignore + + def close(self) -> None: + if self.gdb: + self.gdb.exit() + + super().close() + + def revert_log_level(self) -> None: + logging.getLogger().setLevel(self.log_level) + + def expect_test_func_name(self, test_func_name: str) -> None: + self.expect_exact('Enter test name:') + self.write(test_func_name) + self.expect_exact('Got test name: ' + test_func_name) + + def expect_none(self, pattern, **kwargs) -> None: # type: ignore + """like dut.expect_all, but with an inverse logic""" + if 'timeout' not in kwargs: + kwargs['timeout'] = 1 + + try: + res = self.expect(pattern, **kwargs) + raise AssertionError(f'Unexpected: {res.group().decode("utf8")}') + except pexpect.TIMEOUT: + pass + + def expect_backtrace(self) -> None: + self.expect_exact('Backtrace:') + self.expect_none('CORRUPTED') + + def expect_gme(self, reason: str) -> None: + """Expect method for Guru Meditation Errors""" + self.expect_exact(f"Guru Meditation Error: Core 0 panic'ed ({reason})") + + def expect_reg_dump(self, core: int = 0) -> None: + """Expect method for the register dump""" + self.expect(r'Core\s+%d register dump:' % core) + + def expect_elf_sha256(self) -> None: + """Expect method for ELF SHA256 line""" + elf_sha256 = sha256(self.app.elf_file) + elf_sha256_len = int( + self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '16') + ) + self.expect_exact('ELF file SHA256: ' + elf_sha256[0:elf_sha256_len]) + + def _call_espcoredump( + self, extra_args: List[str], coredump_file_name: str, output_file_name: str + ) -> None: + # no "with" here, since we need the file to be open for later inspection by the test case + if not self.coredump_output: + self.coredump_output = open(output_file_name, 'w') + + espcoredump_script = os.path.join( + os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py' + ) + espcoredump_args = [ + sys.executable, + espcoredump_script, + 'info_corefile', + '--core', + coredump_file_name, + ] + espcoredump_args += extra_args + espcoredump_args.append(self.app.elf_file) + logging.info('Running %s', ' '.join(espcoredump_args)) + logging.info('espcoredump output is written to %s', self.coredump_output.name) + + subprocess.check_call(espcoredump_args, stdout=self.coredump_output) + self.coredump_output.flush() + self.coredump_output.seek(0) + + def process_coredump_uart(self) -> None: + """Extract the core dump from UART output of the test, run espcoredump on it""" + self.expect(self.COREDUMP_UART_START) + res = self.expect('(.+)' + self.COREDUMP_UART_END) + coredump_base64 = res.group(1).decode('utf8') + with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file: + logging.info('Writing UART base64 core dump to %s', coredump_file.name) + coredump_file.write(coredump_base64) + + output_file_name = os.path.join(self.logdir, 'coredump_uart_result.txt') + self._call_espcoredump( + ['--core-format', 'b64'], coredump_file.name, output_file_name + ) + + def process_coredump_flash(self) -> None: + """Extract the core dump from flash, run espcoredump on it""" + coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin') + logging.info('Writing flash binary core dump to %s', coredump_file_name) + self.serial.dump_flash(partition='coredump', output=coredump_file_name) + + output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt') + self._call_espcoredump( + ['--core-format', 'raw'], coredump_file_name, output_file_name + ) + + def gdb_write(self, command: str) -> Any: + """ + Wrapper to write to gdb with a longer timeout, as test runner + host can be slow sometimes + """ + return self.gdb.write(command, timeout_sec=10) + + def start_gdb(self) -> None: + """ + Runs GDB and connects it to the "serial" port of the DUT. + After this, the DUT expect methods can no longer be used to capture output. + """ + gdb_path = self.toolchain_prefix + 'gdb' + try: + from pygdbmi.constants import GdbTimeoutError + default_gdb_args = ['--nx', '--quiet', '--interpreter=mi2'] + gdb_command = [gdb_path] + default_gdb_args + self.gdb = GdbController(command=gdb_command) + pygdbmi_logger = attach_logger() + except ImportError: + # fallback for pygdbmi<0.10.0.0. + from pygdbmi.gdbcontroller import GdbTimeoutError + self.gdb = GdbController(gdb_path=gdb_path) + pygdbmi_logger = self.gdb.logger + + # pygdbmi logs to console by default, make it log to a file instead + pygdbmi_log_file_name = os.path.join(self.logdir, 'pygdbmi_log.txt') + pygdbmi_logger.setLevel(logging.DEBUG) + while pygdbmi_logger.hasHandlers(): + pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0]) + log_handler = logging.FileHandler(pygdbmi_log_file_name) + log_handler.setFormatter( + logging.Formatter('%(asctime)s %(levelname)s: %(message)s') + ) + pygdbmi_logger.addHandler(log_handler) + try: + gdb_command = self.gdb.command + except AttributeError: + # fallback for pygdbmi < 0.10 + gdb_command = self.gdb.cmd + + logging.info(f'Running command: "{" ".join(quote_string(c) for c in gdb_command)}"') + for _ in range(10): + try: + # GdbController creates a process with subprocess.Popen(). Is it really running? It is probable that + # an RPI under high load will get non-responsive during creating a lot of processes. + if not hasattr(self.gdb, 'verify_valid_gdb_subprocess'): + # for pygdbmi >= 0.10.0.0 + verify_valid_gdb_subprocess(self.gdb.gdb_process) + resp = self.gdb.get_gdb_response( + timeout_sec=10 + ) # calls verify_valid_gdb_subprocess() internally for pygdbmi < 0.10.0.0 + # it will be interesting to look up this response if the next GDB command fails (times out) + logging.info('GDB response: %s', resp) + break # success + except GdbTimeoutError: + logging.warning( + 'GDB internal error: cannot get response from the subprocess' + ) + except NoGdbProcessError: + logging.error('GDB internal error: process is not running') + break # failure - TODO: create another GdbController + except ValueError: + logging.error( + 'GDB internal error: select() returned an unexpected file number' + ) + + # Set up logging for GDB remote protocol + gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt') + self.gdb_write('-gdb-set remotelogfile ' + gdb_remotelog_file_name) + + # Load the ELF file + self.gdb_write('-file-exec-and-symbols {}'.format(self.app.elf_file)) + + # Connect GDB to UART + self.serial.proc.close() + logging.info('Connecting to GDB Stub...') + self.gdb_write('-gdb-set serial baud 115200') + responses = self.gdb_write('-target-select remote ' + self.serial.port) + + # Make sure we get the 'stopped' notification + stop_response = self.find_gdb_response('stopped', 'notify', responses) + if not stop_response: + responses = self.gdb_write('-exec-interrupt') + stop_response = self.find_gdb_response('stopped', 'notify', responses) + assert stop_response + frame = stop_response['payload']['frame'] + if 'file' not in frame: + frame['file'] = '?' + if 'line' not in frame: + frame['line'] = '?' + logging.info('Stopped in {func} at {addr} ({file}:{line})'.format(**frame)) + + # Drain remaining responses + self.gdb.get_gdb_response(raise_error_on_timeout=False) + + def gdb_backtrace(self) -> Any: + """ + Returns the list of stack frames for the current thread. + Each frame is a dictionary, refer to pygdbmi docs for the format. + """ + assert self.gdb + + responses = self.gdb_write('-stack-list-frames') + return self.find_gdb_response('done', 'result', responses)['payload']['stack'] + + @staticmethod + def match_backtrace( + gdb_backtrace: List[Any], expected_functions_list: List[Any] + ) -> bool: + """ + Returns True if the function names listed in expected_functions_list match the backtrace + given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace() + function. + """ + return all( + [ + frame['func'] == expected_functions_list[i] + for i, frame in enumerate(gdb_backtrace) + ] + ) + + @staticmethod + def find_gdb_response( + message: str, response_type: str, responses: List[Any] + ) -> Any: + """ + Helper function which extracts one response from an array of GDB responses, filtering + by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format. + """ + + def match_response(response: Dict[str, Any]) -> bool: + return response['message'] == message and response['type'] == response_type # type: ignore + + filtered_responses = [r for r in responses if match_response(r)] + if not filtered_responses: + return None + return filtered_responses[0]