From 252036567c7f07e6244ac82047df3cb589100349 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Ga=C5=88o?= Date: Fri, 14 May 2021 11:20:38 +0200 Subject: [PATCH] Moved filters out of idf_monitor.py --- .gitlab/ci/rules.yml | 2 +- tools/ci/mypy_ignore_list.txt | 1 - tools/gdb_panic_server.py | 6 +- tools/idf_monitor.py | 381 ++++++----------------- tools/idf_monitor_base/__init__.py | 33 ++ tools/idf_monitor_base/console_parser.py | 2 +- tools/idf_monitor_base/gdbhelper.py | 131 ++++++++ tools/idf_monitor_base/output_helpers.py | 122 +++++++- 8 files changed, 383 insertions(+), 295 deletions(-) create mode 100644 tools/idf_monitor_base/gdbhelper.py diff --git a/.gitlab/ci/rules.yml b/.gitlab/ci/rules.yml index b10a61eea8..ba46aca194 100644 --- a/.gitlab/ci/rules.yml +++ b/.gitlab/ci/rules.yml @@ -96,7 +96,7 @@ - "tools/esp_app_trace/**/*" - "tools/ldgen/**/*" - - "tools/gdb_panic_server.py" + - "tools/idf_monitor_base/*" - "tools/idf_monitor.py" - "tools/test_idf_monitor/**/*" diff --git a/tools/ci/mypy_ignore_list.txt b/tools/ci/mypy_ignore_list.txt index 37824dc4a4..766c57e3dc 100644 --- a/tools/ci/mypy_ignore_list.txt +++ b/tools/ci/mypy_ignore_list.txt @@ -219,7 +219,6 @@ tools/esp_prov/utils/convenience.py tools/find_apps.py tools/find_build_apps/common.py tools/find_build_apps/make.py -tools/gdb_panic_server.py tools/gen_esp_err_to_name.py tools/idf.py tools/idf_py_actions/core_ext.py diff --git a/tools/gdb_panic_server.py b/tools/gdb_panic_server.py index fab5b59d62..89b5ab93e0 100644 --- a/tools/gdb_panic_server.py +++ b/tools/gdb_panic_server.py @@ -75,7 +75,7 @@ GDB_REGS_INFO = { PanicInfo = namedtuple('PanicInfo', 'core_id regs stack_base_addr stack_data') -def build_riscv_panic_output_parser(): # type: () -> typing.Type[ParserElement] +def build_riscv_panic_output_parser(): # type: () -> typing.Any[typing.Type[ParserElement]] """Builds a parser for the panic handler output using pyparsing""" # We don't match the first line, since "Guru Meditation" will not be printed in case of an abort: @@ -258,7 +258,7 @@ class GdbServer(object): # For any memory address that is not on the stack, pretend the value is 0x00. # GDB should never ask us for program memory, it will be obtained from the ELF file. - def in_stack(addr): + def in_stack(addr): # type: (int) -> typing.Any[bool] return stack_addr_min <= addr < stack_addr_max result = '' @@ -271,7 +271,7 @@ class GdbServer(object): self._respond(result) -def main(): +def main(): # type: () -> None parser = argparse.ArgumentParser() parser.add_argument('input_file', type=argparse.FileType('r'), help='File containing the panic handler output') diff --git a/tools/idf_monitor.py b/tools/idf_monitor.py index 670d276dbe..038dde1726 100755 --- a/tools/idf_monitor.py +++ b/tools/idf_monitor.py @@ -34,32 +34,36 @@ from __future__ import division, print_function, unicode_literals import argparse import codecs -import datetime import os import re import subprocess import threading import time from builtins import bytes, object -from typing import AnyStr, BinaryIO, Callable, List, Optional, Union -import serial.tools.miniterm as miniterm +try: + from typing import List, Optional, Union # noqa +except ImportError: + pass + from idf_monitor_base import (COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, COREDUMP_DONE, COREDUMP_IDLE, COREDUMP_READING, COREDUMP_UART_END, COREDUMP_UART_PROMPT, COREDUMP_UART_START, DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR, PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, PANIC_READING, PANIC_STACK_DUMP, - PANIC_START) + PANIC_START, prompt_next_action) from idf_monitor_base.chip_specific_config import get_chip_config from idf_monitor_base.console_parser import ConsoleParser from idf_monitor_base.console_reader import ConsoleReader from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, - CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, CTRL_T, TAG_CMD, - TAG_KEY, TAG_SERIAL, TAG_SERIAL_FLUSH) + CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, TAG_CMD, TAG_KEY, + TAG_SERIAL, TAG_SERIAL_FLUSH) from idf_monitor_base.exceptions import SerialStopException +from idf_monitor_base.gdbhelper import GDBHelper from idf_monitor_base.line_matcher import LineMatcher -from idf_monitor_base.output_helpers import normal_print, red_print, yellow_print +from idf_monitor_base.output_helpers import Logger, lookup_pc_address, normal_print, yellow_print from idf_monitor_base.serial_reader import SerialReader from idf_monitor_base.web_socket_client import WebSocketClient +from serial.tools import miniterm try: import queue # noqa @@ -89,21 +93,21 @@ class Monitor(object): """ def __init__( - self, - serial_instance, # type: serial.Serial - elf_file, # type: str - print_filter, # type: str - make='make', # type: str - encrypted=False, # type: bool - toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, # type: str - eol='CRLF', # type: str - decode_coredumps=COREDUMP_DECODE_INFO, # type: str - decode_panic=PANIC_DECODE_DISABLE, # type: str - target='esp32', # type: str - websocket_client=None, # type: WebSocketClient - enable_address_decoding=True, # type: bool - timestamps=False, # type: bool - timestamp_format='' # type: str + self, + serial_instance, # type: serial.Serial + elf_file, # type: str + print_filter, # type: str + make='make', # type: str + encrypted=False, # type: bool + toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, # type: str + eol='CRLF', # type: str + decode_coredumps=COREDUMP_DECODE_INFO, # type: str + decode_panic=PANIC_DECODE_DISABLE, # type: str + target='esp32', # type: str + websocket_client=None, # type: WebSocketClient + enable_address_decoding=True, # type: bool + timestamps=False, # type: bool + timestamp_format='' # type: str ): super(Monitor, self).__init__() self.event_queue = queue.Queue() # type: queue.Queue @@ -136,24 +140,21 @@ class Monitor(object): # internal state self._last_line_part = b'' - self._gdb_buffer = b'' self._pc_address_buffer = b'' self._line_matcher = LineMatcher(print_filter) self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer] self._force_line_print = False - self._output_enabled = True self._serial_check_exit = socket_mode - self._log_file = None # type: Optional[BinaryIO] self._decode_coredumps = decode_coredumps self._reading_coredump = COREDUMP_IDLE self._coredump_buffer = b'' self._decode_panic = decode_panic self._reading_panic = PANIC_IDLE self._panic_buffer = b'' - self.gdb_exit = False self.start_cmd_sent = False - self._timestamps = timestamps - self._timestamp_format = timestamp_format + self.gdb_helper = GDBHelper(self.toolchain_prefix, self.websocket_client, self.elf_file, self.serial.port, + self.serial.baudrate) + self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format) def invoke_processing_last_line(self): # type: () -> None @@ -163,13 +164,13 @@ class Monitor(object): # type: () -> None self.console_reader.start() self.serial_reader.start() - self.gdb_exit = False + self.gdb_helper.gdb_exit = False self.start_cmd_sent = False try: while self.console_reader.alive and self.serial_reader.alive: try: - if self.gdb_exit is True: - self.gdb_exit = False + if self.gdb_helper.gdb_exit: + self.gdb_helper.gdb_exit = False time.sleep(0.3) try: @@ -204,7 +205,8 @@ class Monitor(object): self.handle_serial_input(data) if self._invoke_processing_last_line_timer is not None: self._invoke_processing_last_line_timer.cancel() - self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line) + self._invoke_processing_last_line_timer = threading.Timer(0.1, + self.invoke_processing_last_line) self._invoke_processing_last_line_timer.start() # If no futher data is received in the next short period # of time then the _invoke_processing_last_line_timer @@ -214,7 +216,7 @@ class Monitor(object): elif event_tag == TAG_SERIAL_FLUSH: self.handle_serial_input(data, finalize_line=True) else: - raise RuntimeError('Bad event data %r' % ((event_tag,data),)) + raise RuntimeError('Bad event data %r' % ((event_tag, data),)) except KeyboardInterrupt: try: yellow_print('To exit from IDF monitor please use \"Ctrl+]\"') @@ -231,7 +233,7 @@ class Monitor(object): try: self.console_reader.stop() self.serial_reader.stop() - self.stop_logging() + self.logger.stop_logging() # Cancelling _invoke_processing_last_line_timer is not # important here because receiving empty data doesn't matter. self._invoke_processing_last_line_timer = None @@ -239,6 +241,11 @@ class Monitor(object): pass normal_print('\n') + def check_gdb_stub_and_run(self, line): # type: (bytes) -> None + if self.gdb_helper.check_gdb_stub_trigger(line): + with self: # disable console control + self.gdb_helper.run_gdb() + def handle_serial_input(self, data, finalize_line=False): # type: (bytes, bool) -> None # Remove "+" after Continue command @@ -257,47 +264,53 @@ class Monitor(object): # last part is not a full line self._last_line_part = sp.pop() for line in sp: - if line != b'': - if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'): - raise SerialStopException() - self.check_panic_decode_trigger(line) - self.check_coredump_trigger_before_print(line) - if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')): - self._print(line + b'\n') - self.handle_possible_pc_address_in_line(line) - self.check_coredump_trigger_after_print() - self.check_gdbstub_trigger(line) - self._force_line_print = False + if line == b'': + continue + if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'): + raise SerialStopException() + self.check_panic_decode_trigger(line) + self.check_coredump_trigger_before_print(line) + if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')): + self.logger.print(line + b'\n') + self.handle_possible_pc_address_in_line(line) + self.check_coredump_trigger_after_print() + self.check_gdb_stub_and_run(line) + self._force_line_print = False # Now we have the last part (incomplete line) in _last_line_part. By # default we don't touch it and just wait for the arrival of the rest # of the line. But after some time when we didn't received it we need # to make a decision. - if self._last_line_part != b'': - if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))): - self._force_line_print = True - self._print(self._last_line_part) - self.handle_possible_pc_address_in_line(self._last_line_part) - self.check_gdbstub_trigger(self._last_line_part) - # It is possible that the incomplete line cuts in half the PC - # address. A small buffer is kept and will be used the next time - # handle_possible_pc_address_in_line is invoked to avoid this problem. - # MATCH_PCADDR matches 10 character long addresses. Therefore, we - # keep the last 9 characters. - self._pc_address_buffer = self._last_line_part[-9:] - # GDB sequence can be cut in half also. GDB sequence is 7 - # characters long, therefore, we save the last 6 characters. - self._gdb_buffer = self._last_line_part[-6:] - self._last_line_part = b'' + force_print_or_matched = any(( + self._force_line_print, + (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))) + )) + if self._last_line_part != b'' and force_print_or_matched: + self._force_line_print = True + self.logger.print(self._last_line_part) + self.handle_possible_pc_address_in_line(self._last_line_part) + self.check_gdb_stub_and_run(self._last_line_part) + # It is possible that the incomplete line cuts in half the PC + # address. A small buffer is kept and will be used the next time + # handle_possible_pc_address_in_line is invoked to avoid this problem. + # MATCH_PCADDR matches 10 character long addresses. Therefore, we + # keep the last 9 characters. + self._pc_address_buffer = self._last_line_part[-9:] + # GDB sequence can be cut in half also. GDB sequence is 7 + # characters long, therefore, we save the last 6 characters. + self.gdb_helper.gdb_buffer = self._last_line_part[-6:] + self._last_line_part = b'' # else: keeping _last_line_part and it will be processed the next time # handle_serial_input is invoked - def handle_possible_pc_address_in_line(self, line): - # type: (bytes) -> None + def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None line = self._pc_address_buffer + line self._pc_address_buffer = b'' - if self.enable_address_decoding: - for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')): - self.lookup_pc_address(m.group()) + if not self.enable_address_decoding: + return + for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')): + translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file) + if translation: + self.logger.print(translation, console_printer=yellow_print) def __enter__(self): # type: () -> None @@ -308,29 +321,9 @@ class Monitor(object): def __exit__(self, *args, **kwargs): # type: ignore """ Use 'with self' to temporarily disable monitoring behaviour """ self.console_reader.start() - self.serial_reader.gdb_exit = self.gdb_exit # write gdb_exit flag + self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag self.serial_reader.start() - def prompt_next_action(self, reason): # type: (str) -> None - self.console.setup() # set up console to trap input characters - try: - red_print('--- {}'.format(reason)) - red_print(self.console_parser.get_next_action_text()) - - k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. - while k == CTRL_T: - k = self.console.getkey() - finally: - self.console.cleanup() - ret = self.console_parser.parse_next_action_key(k) - if ret is not None: - cmd = ret[1] - if cmd == CMD_STOP: - # the stop command should be handled last - self.event_queue.put(ret) - else: - self.cmd_queue.put(ret) - def run_make(self, target): # type: (str) -> None with self: if isinstance(self.make, list): @@ -344,43 +337,10 @@ class Monitor(object): except KeyboardInterrupt: p.wait() if p.returncode != 0: - self.prompt_next_action('Build failed') + prompt_next_action('Build failed', self.console, self.console_parser, self.event_queue, + self.cmd_queue) else: - self.output_enable(True) - - def lookup_pc_address(self, pc_addr): # type: (str) -> None - cmd = ['%saddr2line' % self.toolchain_prefix, - '-pfiaC', '-e', self.elf_file, pc_addr] - try: - translation = subprocess.check_output(cmd, cwd='.') - if b'?? ??:0' not in translation: - self._print(translation.decode(), console_printer=yellow_print) - except OSError as e: - red_print('%s: %s' % (' '.join(cmd), e)) - - def check_gdbstub_trigger(self, line): # type: (bytes) -> None - line = self._gdb_buffer + line - self._gdb_buffer = b'' - m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break - if m is not None: - try: - chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF - calc_chsum = int(m.group(2), 16) - except ValueError: - return # payload wasn't valid hex digits - if chsum == calc_chsum: - if self.websocket_client: - yellow_print('Communicating through WebSocket') - self.websocket_client.send({'event': 'gdb_stub', - 'port': self.serial.port, - 'prog': self.elf_file}) - yellow_print('Waiting for debug finished event') - self.websocket_client.wait([('event', 'debug_finished')]) - yellow_print('Communications through WebSocket is finished') - else: - self.run_gdb() - else: - red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum)) + self.logger.output_enabled = True def check_coredump_trigger_before_print(self, line): # type: (bytes) -> None if self._decode_coredumps == COREDUMP_DECODE_DISABLE: @@ -395,7 +355,7 @@ class Monitor(object): yellow_print('Core dump started (further output muted)') self._reading_coredump = COREDUMP_READING self._coredump_buffer = b'' - self._output_enabled = False + self.logger.output_enabled = False return if COREDUMP_UART_END in line: @@ -410,16 +370,16 @@ class Monitor(object): self._coredump_buffer += line.replace(b'\r', b'') + b'\n' new_buffer_len_kb = len(self._coredump_buffer) // kb if new_buffer_len_kb > buffer_len_kb: - yellow_print('Received %3d kB...' % (new_buffer_len_kb), newline='\r') + yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r') def check_coredump_trigger_after_print(self): # type: () -> None if self._decode_coredumps == COREDUMP_DECODE_DISABLE: return # Re-enable output after the last line of core dump has been consumed - if not self._output_enabled and self._reading_coredump == COREDUMP_DONE: + if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE: self._reading_coredump = COREDUMP_IDLE - self._output_enabled = True + self.logger.output_enabled = True self._coredump_buffer = b'' def process_coredump(self): # type: () -> None @@ -436,7 +396,7 @@ class Monitor(object): coredump_file.flush() if self.websocket_client: - self._output_enabled = True + self.logger.output_enabled = True yellow_print('Communicating through WebSocket') self.websocket_client.send({'event': 'coredump', 'file': coredump_file.name, @@ -453,14 +413,14 @@ class Monitor(object): self.elf_file ] output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) - self._output_enabled = True - self._print(output) - self._output_enabled = False # Will be reenabled in check_coredump_trigger_after_print + self.logger.output_enabled = True + self.logger.print(output) + self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print except subprocess.CalledProcessError as e: yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output)) - self._output_enabled = True - self._print(COREDUMP_UART_START + b'\n') - self._print(self._coredump_buffer) + self.logger.output_enabled = True + self.logger.print(COREDUMP_UART_START + b'\n') + self.logger.print(self._coredump_buffer) # end line will be printed in handle_serial_input finally: if coredump_file is not None: @@ -478,155 +438,17 @@ class Monitor(object): yellow_print('Stack dump detected') if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line: - self._output_enabled = False + self.logger.output_enabled = False if self._reading_panic == PANIC_READING: self._panic_buffer += line.replace(b'\r', b'') + b'\n' if self._reading_panic == PANIC_READING and PANIC_END in line: self._reading_panic = PANIC_IDLE - self._output_enabled = True - self.process_panic_output(self._panic_buffer) + self.logger.output_enabled = True + self.gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target) self._panic_buffer = b'' - def process_panic_output(self, panic_output): # type: (bytes) -> None - panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'tools', 'gdb_panic_server.py') - panic_output_file = None - try: - # On Windows, the temporary file can't be read unless it is closed. - # Set delete=False and delete the file manually later. - with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file: - panic_output_file.write(panic_output) - panic_output_file.flush() - - cmd = [self.toolchain_prefix + 'gdb', - '--batch', '-n', - self.elf_file, - '-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\"" - .format(python=sys.executable, - script=panic_output_decode_script, - target=self.target, - output_file=panic_output_file.name), - '-ex', 'bt'] - - output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) - yellow_print('\nBacktrace:\n\n') - self._print(output) - except subprocess.CalledProcessError as e: - yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output)) - self._print(panic_output) - finally: - if panic_output_file is not None: - try: - os.unlink(panic_output_file.name) - except OSError as e: - yellow_print('Couldn\'t remove temporary panic output file ({})'.format(e)) - - def run_gdb(self): # type: () -> None - with self: # disable console control - normal_print('') - try: - cmd = ['%sgdb' % self.toolchain_prefix, - '-ex', 'set serial baud %d' % self.serial.baudrate, - '-ex', 'target remote %s' % self.serial.port, - self.elf_file] - - # Here we handling GDB as a process - # Open GDB process - try: - process = subprocess.Popen(cmd, cwd='.') - except KeyboardInterrupt: - pass - - # We ignore Ctrl+C interrupt form external process abd wait responce util GDB will be finished. - while True: - try: - process.wait() - break - except KeyboardInterrupt: - pass # We ignore the Ctrl+C - self.gdb_exit = True - - except OSError as e: - red_print('%s: %s' % (' '.join(cmd), e)) - except KeyboardInterrupt: - pass # happens on Windows, maybe other OSes - finally: - try: - # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns... - process.terminate() - except Exception: - pass - try: - # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode - subprocess.call(['stty', 'sane']) - except Exception: - pass # don't care if there's no stty, we tried... - - def output_enable(self, enable): # type: (bool) -> None - self._output_enabled = enable - - def output_toggle(self): # type: () -> None - self._output_enabled = not self._output_enabled - - yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format( - self._output_enabled)) - - def toggle_logging(self): # type: () -> None - if self._log_file: - self.stop_logging() - else: - self.start_logging() - - def toggle_timestamps(self): # type: () -> None - self._timestamps = not self._timestamps - - def start_logging(self): # type: () -> None - if not self._log_file: - name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0], - datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - try: - self._log_file = open(name, 'wb+') - yellow_print('\nLogging is enabled into file {}'.format(name)) - except Exception as e: - red_print('\nLog file {} cannot be created: {}'.format(name, e)) - - def stop_logging(self): # type: () -> None - if self._log_file: - try: - name = self._log_file.name - self._log_file.close() - yellow_print('\nLogging is disabled and file {} has been closed'.format(name)) - except Exception as e: - red_print('\nLog file cannot be closed: {}'.format(e)) - finally: - self._log_file = None - - def _print(self, string, console_printer=None): # type: (AnyStr, Optional[Callable]) -> None - if console_printer is None: - console_printer = self.console.write_bytes - if self._timestamps and (self._output_enabled or self._log_file): - t = datetime.datetime.now().strftime(self._timestamp_format) - # "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines. - if isinstance(string, type(u'')): - search_patt = '\n' - replacement = '\n' + t + ' ' - else: - search_patt = b'\n' # type: ignore - replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore - string = string.replace(search_patt, replacement) - if self._output_enabled: - console_printer(string) - if self._log_file: - try: - if isinstance(string, type(u'')): - string = string.encode() # type: ignore - self._log_file.write(string) # type: ignore - except Exception as e: - red_print('\nCannot write to file: {}'.format(e)) - # don't fill-up the screen with the previous errors (probably consequent prints would fail also) - self.stop_logging() - def handle_commands(self, cmd, chip): # type: (int, str) -> None config = get_chip_config(chip) reset_delay = config['reset'] @@ -645,17 +467,18 @@ class Monitor(object): time.sleep(reset_delay) self.serial.setRTS(high) self.serial.setDTR(self.serial.dtr) # usbser.sys workaround - self.output_enable(low) + self.logger.output_enabled = True elif cmd == CMD_MAKE: self.run_make('encrypted-flash' if self.encrypted else 'flash') elif cmd == CMD_APP_FLASH: self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash') elif cmd == CMD_OUTPUT_TOGGLE: - self.output_toggle() + self.logger.output_toggle() elif cmd == CMD_TOGGLE_LOGGING: - self.toggle_logging() + self.logger.toggle_logging() elif cmd == CMD_TOGGLE_TIMESTAMPS: - self.toggle_timestamps() + self.logger.toggle_timestamps() + self.logger.toggle_logging() elif cmd == CMD_ENTER_BOOT: self.serial.setDTR(high) # IO0=HIGH self.serial.setRTS(low) # EN=LOW, chip in reset diff --git a/tools/idf_monitor_base/__init__.py b/tools/idf_monitor_base/__init__.py index 735491d5dd..2e5a11856f 100644 --- a/tools/idf_monitor_base/__init__.py +++ b/tools/idf_monitor_base/__init__.py @@ -1,5 +1,16 @@ import re +from serial.tools import miniterm + +from .console_parser import ConsoleParser +from .constants import CMD_STOP, CTRL_T +from .output_helpers import red_print + +try: + import queue # noqa +except ImportError: + import Queue as queue # type: ignore # noqa + # regex matches an potential PC value (0x4xxxxxxx) MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE) @@ -33,3 +44,25 @@ PANIC_READING = 1 # panic handler decoding options PANIC_DECODE_DISABLE = 'disable' PANIC_DECODE_BACKTRACE = 'backtrace' + + +def prompt_next_action(reason, console, console_parser, event_queue, cmd_queue): + # type: (str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue) -> None + console.setup() # set up console to trap input characters + try: + red_print('--- {}'.format(reason)) + red_print(console_parser.get_next_action_text()) + + k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. + while k == CTRL_T: + k = console.getkey() + finally: + console.cleanup() + ret = console_parser.parse_next_action_key(k) + if ret is not None: + cmd = ret[1] + if cmd == CMD_STOP: + # the stop command should be handled last + event_queue.put(ret) + else: + cmd_queue.put(ret) diff --git a/tools/idf_monitor_base/console_parser.py b/tools/idf_monitor_base/console_parser.py index 2e8b815c5b..8db2b71fdd 100644 --- a/tools/idf_monitor_base/console_parser.py +++ b/tools/idf_monitor_base/console_parser.py @@ -19,7 +19,7 @@ try: except ImportError: pass -import serial.tools.miniterm as miniterm +from serial.tools import miniterm from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_A, CTRL_F, CTRL_H, CTRL_I, CTRL_L, CTRL_P, diff --git a/tools/idf_monitor_base/gdbhelper.py b/tools/idf_monitor_base/gdbhelper.py new file mode 100644 index 0000000000..1a6140387f --- /dev/null +++ b/tools/idf_monitor_base/gdbhelper.py @@ -0,0 +1,131 @@ +import os +import re +import subprocess +import sys +import tempfile + +from .output_helpers import Logger, normal_print, red_print, yellow_print +from .web_socket_client import WebSocketClient + + +class GDBHelper: + def __init__(self, toolchain_prefix, websocket_client, elf_file, port, baud_rate): + # type: (str, WebSocketClient, str, int, int) -> None + self._gdb_buffer = b'' # type: bytes + self._gdb_exit = False # type: bool + self.toolchain_prefix = toolchain_prefix + self.websocket_client = websocket_client + self.elf_file = elf_file + self.port = port + self.baud_rate = baud_rate + + @property + def gdb_buffer(self): # type: () -> bytes + return self._gdb_buffer + + @gdb_buffer.setter + def gdb_buffer(self, value): # type: (bytes) -> None + self._gdb_buffer = value + + @property + def gdb_exit(self): # type: () -> bool + return self._gdb_exit + + @gdb_exit.setter + def gdb_exit(self, value): # type: (bool) -> None + self._gdb_exit = value + + def run_gdb(self): + # type: () -> None + normal_print('') + try: + cmd = ['%sgdb' % self.toolchain_prefix, + '-ex', 'set serial baud %d' % self.baud_rate, + '-ex', 'target remote %s' % self.port, + self.elf_file] + # Here we handling GDB as a process + # Open GDB process + try: + process = subprocess.Popen(cmd, cwd='.') + except KeyboardInterrupt: + pass + # We ignore Ctrl+C interrupt form external process abd wait response util GDB will be finished. + while True: + try: + process.wait() + break + except KeyboardInterrupt: + pass # We ignore the Ctrl+C + self.gdb_exit = True + except OSError as e: + red_print('%s: %s' % (' '.join(cmd), e)) + except KeyboardInterrupt: + pass # happens on Windows, maybe other OSes + finally: + try: + # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns... + process.terminate() + except Exception: # noqa + pass + try: + # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode + subprocess.call(['stty', 'sane']) + except Exception: # noqa + pass # don't care if there's no stty, we tried... + + def check_gdb_stub_trigger(self, line): + # type: (bytes) -> bool + line = self.gdb_buffer + line + self.gdb_buffer = b'' + m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break + if m is not None: + try: + chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF + calc_chsum = int(m.group(2), 16) + except ValueError: # payload wasn't valid hex digits + return False + if chsum == calc_chsum: + if self.websocket_client: + yellow_print('Communicating through WebSocket') + self.websocket_client.send({'event': 'gdb_stub', + 'port': self.port, + 'prog': self.elf_file}) + yellow_print('Waiting for debug finished event') + self.websocket_client.wait([('event', 'debug_finished')]) + yellow_print('Communications through WebSocket is finished') + else: + return True + else: + red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum)) + return False + + def process_panic_output(self, panic_output, logger, target): # type: (bytes, Logger, str) -> None + panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py') + panic_output_file = None + try: + # On Windows, the temporary file can't be read unless it is closed. + # Set delete=False and delete the file manually later. + with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file: + panic_output_file.write(panic_output) + panic_output_file.flush() + cmd = [self.toolchain_prefix + 'gdb', + '--batch', '-n', + self.elf_file, + '-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\"" + .format(python=sys.executable, + script=panic_output_decode_script, + target=target, + output_file=panic_output_file.name), + '-ex', 'bt'] + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + yellow_print('\nBacktrace:\n\n') + logger.print(output) # noqa: E999 + except subprocess.CalledProcessError as e: + yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output)) + logger.print(panic_output) + finally: + if panic_output_file is not None: + try: + os.unlink(panic_output_file.name) + except OSError as e: + yellow_print('Couldn\'t remove temporary panic output file ({})'.format(e)) diff --git a/tools/idf_monitor_base/output_helpers.py b/tools/idf_monitor_base/output_helpers.py index 07a8c5b343..6454114b42 100644 --- a/tools/idf_monitor_base/output_helpers.py +++ b/tools/idf_monitor_base/output_helpers.py @@ -13,35 +13,137 @@ # limitations under the License. +import datetime +import os +import subprocess import sys +from serial.tools import miniterm + try: - from typing import Optional + from typing import BinaryIO, Callable, Optional, Union # noqa except ImportError: pass -# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated) +# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be updated) ANSI_RED = '\033[1;31m' ANSI_YELLOW = '\033[0;33m' ANSI_NORMAL = '\033[0m' -def color_print(message, color, newline='\n'): - # type: (str, str, Optional[str]) -> None +def color_print(message, color, newline='\n'): # type: (str, str, Optional[str]) -> None """ Print a message to stderr with colored highlighting """ sys.stderr.write('%s%s%s%s' % (color, message, ANSI_NORMAL, newline)) -def normal_print(message): - # type: (str) -> None +def normal_print(message): # type: (str) -> None sys.stderr.write(ANSI_NORMAL + message) -def yellow_print(message, newline='\n'): - # type: (str, Optional[str]) -> None +def yellow_print(message, newline='\n'): # type: (str, Optional[str]) -> None color_print(message, ANSI_YELLOW, newline) -def red_print(message, newline='\n'): - # type: (str, Optional[str]) -> None +def red_print(message, newline='\n'): # type: (str, Optional[str]) -> None color_print(message, ANSI_RED, newline) + + +def lookup_pc_address(pc_addr, toolchain_prefix, elf_file): # type: (str, str, str) -> Optional[str] + cmd = ['%saddr2line' % toolchain_prefix, '-pfiaC', '-e', elf_file, pc_addr] + + try: + translation = subprocess.check_output(cmd, cwd='.') + if b'?? ??:0' not in translation: + return translation.decode() + except OSError as e: + red_print('%s: %s' % (' '.join(cmd), e)) + return None + + +class Logger: + def __init__(self, elf_file, console, timestamps, timestamp_format): + # type: (str, miniterm.Console, bool, str) -> None + self.log_file = None # type: Optional[BinaryIO] + self._output_enabled = True # type: bool + self.elf_file = elf_file + self.console = console + self.timestamps = timestamps + self.timestamp_format = timestamp_format + + @property + def output_enabled(self): # type: () -> bool + return self._output_enabled + + @output_enabled.setter + def output_enabled(self, value): # type: (bool) -> None + self._output_enabled = value + + @property + def log_file(self): # type: () -> Optional[BinaryIO] + return self._log_file + + @log_file.setter + def log_file(self, value): # type: (Optional[BinaryIO]) -> None + self._log_file = value + + def toggle_logging(self): # type: () -> None + if self._log_file: + self.stop_logging() + else: + self.start_logging() + + def toggle_timestamps(self): # type: () -> None + self.timestamps = not self.timestamps + + def start_logging(self): # type: () -> None + if not self._log_file: + name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0], + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) + try: + self.log_file = open(name, 'wb+') + yellow_print('\nLogging is enabled into file {}'.format(name)) + except Exception as e: + red_print('\nLog file {} cannot be created: {}'.format(name, e)) + + def stop_logging(self): # type: () -> None + if self._log_file: + try: + name = self._log_file.name + self._log_file.close() + yellow_print('\nLogging is disabled and file {} has been closed'.format(name)) + except Exception as e: + red_print('\nLog file cannot be closed: {}'.format(e)) + finally: + self._log_file = None + + def print(self, string, console_printer=None): # noqa: E999 + # type: (Union[str, bytes], Optional[Callable]) -> None + if console_printer is None: + console_printer = self.console.write_bytes + + if self.timestamps and (self._output_enabled or self._log_file): + t = datetime.datetime.now().strftime(self.timestamp_format) + # "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines. + if isinstance(string, type(u'')): + search_patt = '\n' + replacement = '\n' + t + ' ' + else: + search_patt = b'\n' # type: ignore + replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore + string = string.replace(search_patt, replacement) # type: ignore + if self._output_enabled: + console_printer(string) + if self._log_file: + try: + if isinstance(string, type(u'')): + string = string.encode() # type: ignore + self._log_file.write(string) # type: ignore + except Exception as e: + red_print('\nCannot write to file: {}'.format(e)) + # don't fill-up the screen with the previous errors (probably consequent prints would fail also) + self.stop_logging() + + def output_toggle(self): # type: () -> None + self.output_enabled = not self.output_enabled + yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format( + self.output_enabled))