From 2452dc57f0a38f0956cca4d636b0f6d181cc1a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Ga=C5=88o?= Date: Fri, 14 May 2021 11:20:38 +0200 Subject: [PATCH] Tools: extracted functionality out of idf_monitor --- tools/idf_monitor.py | 377 +++--------------- .../idf_monitor_base/ansi_color_converter.py | 8 +- tools/idf_monitor_base/argument_parser.py | 120 ++++++ tools/idf_monitor_base/console_parser.py | 10 +- tools/idf_monitor_base/console_reader.py | 6 +- tools/idf_monitor_base/constants.py | 18 +- tools/idf_monitor_base/coredump.py | 3 +- tools/idf_monitor_base/gdbhelper.py | 3 +- tools/idf_monitor_base/logger.py | 134 +++++++ tools/idf_monitor_base/output_helpers.py | 99 +---- tools/idf_monitor_base/serial_handler.py | 195 +++++++++ tools/idf_monitor_base/serial_reader.py | 32 +- tools/idf_monitor_base/stoppable_thread.py | 6 +- 13 files changed, 555 insertions(+), 456 deletions(-) create mode 100644 tools/idf_monitor_base/argument_parser.py create mode 100644 tools/idf_monitor_base/logger.py create mode 100644 tools/idf_monitor_base/serial_handler.py diff --git a/tools/idf_monitor.py b/tools/idf_monitor.py index af90b568bf..32f0525071 100755 --- a/tools/idf_monitor.py +++ b/tools/idf_monitor.py @@ -32,43 +32,19 @@ from __future__ import division, print_function, unicode_literals -import argparse import codecs import os import re -import subprocess import threading import time from builtins import bytes, object try: - from typing import List, Optional, Union # noqa + from typing import Any, List, Optional, Union # noqa except ImportError: pass -from idf_monitor_base.chip_specific_config import get_chip_config -from idf_monitor_base.console_parser import ConsoleParser, prompt_next_action -from idf_monitor_base.console_reader import ConsoleReader -from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, - CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, - DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR, - PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, - PANIC_READING, PANIC_STACK_DUMP, PANIC_START, TAG_CMD, TAG_KEY, TAG_SERIAL, - TAG_SERIAL_FLUSH) -from idf_monitor_base.coredump import COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, CoreDump -from idf_monitor_base.exceptions import SerialStopException -from idf_monitor_base.gdbhelper import GDBHelper -from idf_monitor_base.line_matcher import LineMatcher -from idf_monitor_base.output_helpers import Logger, lookup_pc_address, normal_print, yellow_print -from idf_monitor_base.serial_reader import SerialReader -from idf_monitor_base.web_socket_client import WebSocketClient -from serial.tools import miniterm - -try: - import queue # noqa -except ImportError: - import Queue as queue # type: ignore # noqa - +import queue import shlex import sys @@ -76,6 +52,24 @@ import serial import serial.tools.list_ports # Windows console stuff from idf_monitor_base.ansi_color_converter import get_converter +from idf_monitor_base.argument_parser import get_parser +from idf_monitor_base.console_parser import ConsoleParser +from idf_monitor_base.console_reader import ConsoleReader +from idf_monitor_base.constants import (CTRL_C, CTRL_H, DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, + ESPPORT_ENVIRON, EVENT_QUEUE_TIMEOUT, GDB_EXIT_TIMEOUT, + GDB_UART_CONTINUE_COMMAND, LAST_LINE_THREAD_INTERVAL, MAKEFLAGS_ENVIRON, + PANIC_DECODE_DISABLE, PANIC_IDLE, TAG_CMD, TAG_KEY, TAG_SERIAL, + TAG_SERIAL_FLUSH) +from idf_monitor_base.coredump import COREDUMP_DECODE_INFO, CoreDump +from idf_monitor_base.exceptions import SerialStopException +from idf_monitor_base.gdbhelper import GDBHelper +from idf_monitor_base.line_matcher import LineMatcher +from idf_monitor_base.logger import Logger +from idf_monitor_base.output_helpers import normal_print, yellow_print +from idf_monitor_base.serial_handler import SerialHandler, run_make +from idf_monitor_base.serial_reader import SerialReader +from idf_monitor_base.web_socket_client import WebSocketClient +from serial.tools import miniterm key_description = miniterm.key_description @@ -111,14 +105,13 @@ class Monitor(object): self.event_queue = queue.Queue() # type: queue.Queue self.cmd_queue = queue.Queue() # type: queue.Queue self.console = miniterm.Console() - self.enable_address_decoding = enable_address_decoding sys.stderr = get_converter(sys.stderr, decode_output=True) self.console.output = get_converter(self.console.output) self.console.byte_output = get_converter(self.console.byte_output) - socket_mode = serial_instance.port.startswith( - 'socket://') # testing hook - data from serial can make exit the monitor + # testing hook - data from serial can make exit the monitor + socket_mode = serial_instance.port.startswith('socket://') self.serial = serial_instance self.console_parser = ConsoleParser(eol) @@ -126,32 +119,22 @@ class Monitor(object): socket_mode) self.serial_reader = SerialReader(self.serial, self.event_queue) self.elf_file = elf_file - if not os.path.exists(make): - # allow for possibility the "make" arg is a list of arguments (for idf.py) - self.make = shlex.split(make) # type: Union[str, List[str]] - else: - self.make = make - self.encrypted = encrypted - self.toolchain_prefix = toolchain_prefix - self.websocket_client = websocket_client + + # allow for possibility the "make" arg is a list of arguments (for idf.py) + self.make = make if os.path.exists(make) else shlex.split(make) # type: Any[Union[str, List[str]], str] self.target = target - # internal state - self._last_line_part = b'' - self._pc_address_buffer = b'' self._line_matcher = LineMatcher(print_filter) - self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer] - self._force_line_print = False - self._serial_check_exit = socket_mode - self._decode_panic = decode_panic - self._reading_panic = PANIC_IDLE - self._panic_buffer = b'' - self.start_cmd_sent = False - self.gdb_helper = GDBHelper(self.toolchain_prefix, self.websocket_client, self.elf_file, self.serial.port, + self.gdb_helper = GDBHelper(toolchain_prefix, websocket_client, self.elf_file, self.serial.port, self.serial.baudrate) + self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format, b'', enable_address_decoding, + toolchain_prefix) + self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, websocket_client, self.elf_file) + self.serial_handler = SerialHandler(b'', socket_mode, self.logger, decode_panic, PANIC_IDLE, b'', target, + False, False, self.serial, encrypted) - self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format) - self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, self.websocket_client, self.elf_file) + # internal state + self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer] def invoke_processing_last_line(self): # type: () -> None @@ -162,18 +145,17 @@ class Monitor(object): self.console_reader.start() self.serial_reader.start() self.gdb_helper.gdb_exit = False - self.start_cmd_sent = False + self.serial_handler.start_cmd_sent = False try: while self.console_reader.alive and self.serial_reader.alive: try: if self.gdb_helper.gdb_exit: self.gdb_helper.gdb_exit = False - - time.sleep(0.3) + time.sleep(GDB_EXIT_TIMEOUT) try: # Continue the program after exit from the GDB - self.serial.write(codecs.encode('+$c#63')) - self.start_cmd_sent = True + self.serial.write(codecs.encode(GDB_UART_CONTINUE_COMMAND)) + self.serial_handler.start_cmd_sent = True except serial.SerialException: pass # this shouldn't happen, but sometimes port has closed in serial thread except UnicodeEncodeError: @@ -183,14 +165,14 @@ class Monitor(object): item = self.cmd_queue.get_nowait() except queue.Empty: try: - item = self.event_queue.get(True, 0.03) + item = self.event_queue.get(timeout=EVENT_QUEUE_TIMEOUT) except queue.Empty: continue - (event_tag, data) = item - + event_tag, data = item if event_tag == TAG_CMD: - self.handle_commands(data, self.target) + self.serial_handler.handle_commands(data, self.target, self.run_make, self.console_reader, + self.serial_reader) elif event_tag == TAG_KEY: try: self.serial.write(codecs.encode(data)) @@ -199,10 +181,12 @@ class Monitor(object): except UnicodeEncodeError: pass # this can happen if a non-ascii character was passed, ignoring elif event_tag == TAG_SERIAL: - self.handle_serial_input(data) + self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump, + self.gdb_helper, self._line_matcher, + self.check_gdb_stub_and_run) if self._invoke_processing_last_line_timer is not None: self._invoke_processing_last_line_timer.cancel() - self._invoke_processing_last_line_timer = threading.Timer(0.1, + self._invoke_processing_last_line_timer = threading.Timer(LAST_LINE_THREAD_INTERVAL, self.invoke_processing_last_line) self._invoke_processing_last_line_timer.start() # If no further data is received in the next short period @@ -211,13 +195,15 @@ class Monitor(object): # the last line. This is fix for handling lines sent # without EOL. elif event_tag == TAG_SERIAL_FLUSH: - self.handle_serial_input(data, finalize_line=True) + self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump, + self.gdb_helper, self._line_matcher, + self.check_gdb_stub_and_run, finalize_line=True) else: raise RuntimeError('Bad event data %r' % ((event_tag, data),)) except KeyboardInterrupt: try: yellow_print('To exit from IDF monitor please use \"Ctrl+]\"') - self.serial.write(codecs.encode('\x03')) + self.serial.write(codecs.encode(CTRL_C)) except serial.SerialException: pass # this shouldn't happen, but sometimes port has closed in serial thread except UnicodeEncodeError: @@ -234,80 +220,10 @@ class Monitor(object): # Cancelling _invoke_processing_last_line_timer is not # important here because receiving empty data doesn't matter. self._invoke_processing_last_line_timer = None - except Exception: + except Exception: # noqa pass normal_print('\n') - def check_gdb_stub_and_run(self, line): # type: (bytes) -> None - if self.gdb_helper.check_gdb_stub_trigger(line): - with self: # disable console control - self.gdb_helper.run_gdb() - - def handle_serial_input(self, data, finalize_line=False): - # type: (bytes, bool) -> None - # Remove "+" after Continue command - if self.start_cmd_sent is True: - self.start_cmd_sent = False - pos = data.find(b'+') - if pos != -1: - data = data[(pos + 1):] - - sp = data.split(b'\n') - if self._last_line_part != b'': - # add unprocessed part from previous "data" to the first line - sp[0] = self._last_line_part + sp[0] - self._last_line_part = b'' - if sp[-1] != b'': - # last part is not a full line - self._last_line_part = sp.pop() - for line in sp: - if line == b'': - continue - if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'): - raise SerialStopException() - self.check_panic_decode_trigger(line) - with self.coredump.check(line): - if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')): - self.logger.print(line + b'\n') - self.handle_possible_pc_address_in_line(line) - self.check_gdb_stub_and_run(line) - self._force_line_print = False - # Now we have the last part (incomplete line) in _last_line_part. By - # default we don't touch it and just wait for the arrival of the rest - # of the line. But after some time when we didn't received it we need - # to make a decision. - force_print_or_matched = any(( - self._force_line_print, - (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))) - )) - if self._last_line_part != b'' and force_print_or_matched: - self._force_line_print = True - self.logger.print(self._last_line_part) - self.handle_possible_pc_address_in_line(self._last_line_part) - self.check_gdb_stub_and_run(self._last_line_part) - # It is possible that the incomplete line cuts in half the PC - # address. A small buffer is kept and will be used the next time - # handle_possible_pc_address_in_line is invoked to avoid this problem. - # MATCH_PCADDR matches 10 character long addresses. Therefore, we - # keep the last 9 characters. - self._pc_address_buffer = self._last_line_part[-9:] - # GDB sequence can be cut in half also. GDB sequence is 7 - # characters long, therefore, we save the last 6 characters. - self.gdb_helper.gdb_buffer = self._last_line_part[-6:] - self._last_line_part = b'' - # else: keeping _last_line_part and it will be processed the next time - # handle_serial_input is invoked - - def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None - line = self._pc_address_buffer + line - self._pc_address_buffer = b'' - if not self.enable_address_decoding: - return - for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')): - translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file) - if translation: - self.logger.print(translation, console_printer=yellow_print) - def __enter__(self): # type: () -> None """ Use 'with self' to temporarily disable monitoring behaviour """ @@ -320,186 +236,20 @@ class Monitor(object): self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag self.serial_reader.start() + def check_gdb_stub_and_run(self, line): # type: (bytes) -> None + if self.gdb_helper.check_gdb_stub_trigger(line): + with self: # disable console control + self.gdb_helper.run_gdb() + def run_make(self, target): # type: (str) -> None with self: - if isinstance(self.make, list): - popen_args = self.make + [target] - else: - popen_args = [self.make, target] - yellow_print('Running %s...' % ' '.join(popen_args)) - p = subprocess.Popen(popen_args, env=os.environ) - try: - p.wait() - except KeyboardInterrupt: - p.wait() - if p.returncode != 0: - prompt_next_action('Build failed', self.console, self.console_parser, self.event_queue, - self.cmd_queue) - else: - self.logger.output_enabled = True - - def check_panic_decode_trigger(self, line): # type: (bytes) -> None - if self._decode_panic == PANIC_DECODE_DISABLE: - return - - if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')): - self._reading_panic = PANIC_READING - yellow_print('Stack dump detected') - - if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line: - self.logger.output_enabled = False - - if self._reading_panic == PANIC_READING: - self._panic_buffer += line.replace(b'\r', b'') + b'\n' - - if self._reading_panic == PANIC_READING and PANIC_END in line: - self._reading_panic = PANIC_IDLE - self.logger.output_enabled = True - self.gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target) - self._panic_buffer = b'' - - def handle_commands(self, cmd, chip): # type: (int, str) -> None - config = get_chip_config(chip) - reset_delay = config['reset'] - enter_boot_set = config['enter_boot_set'] - enter_boot_unset = config['enter_boot_unset'] - - high = False - low = True - - if cmd == CMD_STOP: - self.console_reader.stop() - self.serial_reader.stop() - elif cmd == CMD_RESET: - self.serial.setRTS(low) - self.serial.setDTR(self.serial.dtr) # usbser.sys workaround - time.sleep(reset_delay) - self.serial.setRTS(high) - self.serial.setDTR(self.serial.dtr) # usbser.sys workaround - self.logger.output_enabled = True - elif cmd == CMD_MAKE: - self.run_make('encrypted-flash' if self.encrypted else 'flash') - elif cmd == CMD_APP_FLASH: - self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash') - elif cmd == CMD_OUTPUT_TOGGLE: - self.logger.output_toggle() - elif cmd == CMD_TOGGLE_LOGGING: - self.logger.toggle_logging() - elif cmd == CMD_TOGGLE_TIMESTAMPS: - self.logger.toggle_timestamps() - self.logger.toggle_logging() - elif cmd == CMD_ENTER_BOOT: - self.serial.setDTR(high) # IO0=HIGH - self.serial.setRTS(low) # EN=LOW, chip in reset - self.serial.setDTR(self.serial.dtr) # usbser.sys workaround - time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 - self.serial.setDTR(low) # IO0=LOW - self.serial.setRTS(high) # EN=HIGH, chip out of reset - self.serial.setDTR(self.serial.dtr) # usbser.sys workaround - time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 - self.serial.setDTR(high) # IO0=HIGH, done - else: - raise RuntimeError('Bad command data %d' % cmd) # type: ignore + run_make(target, self.make, self.console, self.console_parser, self.event_queue, self.cmd_queue, + self.logger) def main(): # type: () -> None - parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf') - - parser.add_argument( - '--port', '-p', - help='Serial port device', - default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0') - ) - - parser.add_argument( - '--disable-address-decoding', '-d', - help="Don't print lines about decoded addresses from the application ELF file", - action='store_true', - default=True if os.environ.get('ESP_MONITOR_DECODE') == 0 else False - ) - - parser.add_argument( - '--baud', '-b', - help='Serial port baud rate', - type=int, - default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200))) - - parser.add_argument( - '--make', '-m', - help='Command to run make', - type=str, default='make') - - parser.add_argument( - '--encrypted', - help='Use encrypted targets while running make', - action='store_true') - - parser.add_argument( - '--toolchain-prefix', - help='Triplet prefix to add before cross-toolchain names', - default=DEFAULT_TOOLCHAIN_PREFIX) - - parser.add_argument( - '--eol', - choices=['CR', 'LF', 'CRLF'], - type=lambda c: c.upper(), - help='End of line to use when sending to the serial port', - default='CR') - - parser.add_argument( - 'elf_file', help='ELF file of application', - type=argparse.FileType('rb')) - - parser.add_argument( - '--print_filter', - help='Filtering string', - default=DEFAULT_PRINT_FILTER) - - parser.add_argument( - '--decode-coredumps', - choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE], - default=COREDUMP_DECODE_INFO, - help='Handling of core dumps found in serial output' - ) - - parser.add_argument( - '--decode-panic', - choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE], - default=PANIC_DECODE_DISABLE, - help='Handling of panic handler info found in serial output' - ) - - parser.add_argument( - '--target', - help='Target name (used when stack dump decoding is enabled)', - default=os.environ.get('IDF_TARGET', 'esp32') - ) - - parser.add_argument( - '--revision', - help='Revision of the target', - type=int, - default=0 - ) - - parser.add_argument( - '--ws', - default=os.environ.get('ESP_IDF_MONITOR_WS', None), - help='WebSocket URL for communicating with IDE tools for debugging purposes' - ) - - parser.add_argument( - '--timestamps', - help='Add timestamp for each line', - default=False, - action='store_true') - - parser.add_argument( - '--timestamp-format', - default=os.environ.get('ESP_IDF_MONITOR_TIMESTAMP_FORMAT', '%Y-%m-%d %H:%M:%S'), - help='Set a strftime()-compatible timestamp format' - ) + parser = get_parser() args = parser.parse_args() # GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM @@ -513,8 +263,7 @@ def main(): # type: () -> None yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.') yellow_print('--- Using %s instead...' % args.port) - serial_instance = serial.serial_for_url(args.port, args.baud, - do_not_open=True) + serial_instance = serial.serial_for_url(args.port, args.baud, do_not_open=True) serial_instance.dtr = False serial_instance.rts = False args.elf_file.close() # don't need this as a file @@ -524,18 +273,17 @@ def main(): # type: () -> None # all of the child makes we need (the -j argument remains part of # MAKEFLAGS) try: - makeflags = os.environ['MAKEFLAGS'] + makeflags = os.environ[MAKEFLAGS_ENVIRON] makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags) - os.environ['MAKEFLAGS'] = makeflags + os.environ[MAKEFLAGS_ENVIRON] = makeflags except KeyError: pass # not running a make jobserver # Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment # variable # To make sure the key as well as the value are str type, by the requirements of subprocess - espport_key = str('ESPPORT') espport_val = str(args.port) - os.environ.update({espport_key: espport_val}) + os.environ.update({ESPPORT_ENVIRON: espport_val}) ws = WebSocketClient(args.ws) if args.ws else None try: @@ -553,7 +301,6 @@ def main(): # type: () -> None key_description(CTRL_H))) if args.print_filter != DEFAULT_PRINT_FILTER: yellow_print('--- Print filter: {} ---'.format(args.print_filter)) - monitor.main_loop() except KeyboardInterrupt: pass diff --git a/tools/idf_monitor_base/ansi_color_converter.py b/tools/idf_monitor_base/ansi_color_converter.py index 906db777d5..154643772e 100644 --- a/tools/idf_monitor_base/ansi_color_converter.py +++ b/tools/idf_monitor_base/ansi_color_converter.py @@ -17,11 +17,7 @@ import os import re import sys from io import TextIOBase - -try: - from typing import Optional, Union -except ImportError: - pass +from typing import Any, Optional, TextIO, Union from .output_helpers import ANSI_NORMAL @@ -44,7 +40,7 @@ if os.name == 'nt': def get_converter(orig_output_method=None, decode_output=False): - # type: (Optional[TextIOBase], bool) -> Union[ANSIColorConverter, Optional[TextIOBase]] + # type: (Any[TextIO, Optional[TextIOBase]], bool) -> Union[ANSIColorConverter, Optional[TextIOBase]] """ Returns an ANSIColorConverter on Windows and the original output method (orig_output_method) on other platforms. The ANSIColorConverter with decode_output=True will decode the bytes before passing them to the output. diff --git a/tools/idf_monitor_base/argument_parser.py b/tools/idf_monitor_base/argument_parser.py new file mode 100644 index 0000000000..149ffba6fc --- /dev/null +++ b/tools/idf_monitor_base/argument_parser.py @@ -0,0 +1,120 @@ +# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os + +from .constants import DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE +from .coredump import COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO + + +def get_parser(): # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf') + + parser.add_argument( + '--port', '-p', + help='Serial port device', + default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0') + ) + + parser.add_argument( + '--disable-address-decoding', '-d', + help="Don't print lines about decoded addresses from the application ELF file", + action='store_true', + default=os.environ.get('ESP_MONITOR_DECODE') == 0 + ) + + parser.add_argument( + '--baud', '-b', + help='Serial port baud rate', + type=int, + default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200))) + + parser.add_argument( + '--make', '-m', + help='Command to run make', + type=str, default='make') + + parser.add_argument( + '--encrypted', + help='Use encrypted targets while running make', + action='store_true') + + parser.add_argument( + '--toolchain-prefix', + help='Triplet prefix to add before cross-toolchain names', + default=DEFAULT_TOOLCHAIN_PREFIX) + + parser.add_argument( + '--eol', + choices=['CR', 'LF', 'CRLF'], + type=lambda c: c.upper(), + help='End of line to use when sending to the serial port', + default='CR') + + parser.add_argument( + 'elf_file', help='ELF file of application', + type=argparse.FileType('rb')) + + parser.add_argument( + '--print_filter', + help='Filtering string', + default=DEFAULT_PRINT_FILTER) + + parser.add_argument( + '--decode-coredumps', + choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE], + default=COREDUMP_DECODE_INFO, + help='Handling of core dumps found in serial output' + ) + + parser.add_argument( + '--decode-panic', + choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE], + default=PANIC_DECODE_DISABLE, + help='Handling of panic handler info found in serial output' + ) + + parser.add_argument( + '--target', + help='Target name (used when stack dump decoding is enabled)', + default=os.environ.get('IDF_TARGET', 'esp32') + ) + + parser.add_argument( + '--revision', + help='Revision of the target', + type=int, + default=0 + ) + + parser.add_argument( + '--ws', + default=os.environ.get('ESP_IDF_MONITOR_WS', None), + help='WebSocket URL for communicating with IDE tools for debugging purposes' + ) + + parser.add_argument( + '--timestamps', + help='Add timestamp for each line', + default=False, + action='store_true') + + parser.add_argument( + '--timestamp-format', + default=os.environ.get('ESP_IDF_MONITOR_TIMESTAMP_FORMAT', '%Y-%m-%d %H:%M:%S'), + help='Set a strftime()-compatible timestamp format' + ) + + return parser diff --git a/tools/idf_monitor_base/console_parser.py b/tools/idf_monitor_base/console_parser.py index d1495a8c41..50071cbba5 100644 --- a/tools/idf_monitor_base/console_parser.py +++ b/tools/idf_monitor_base/console_parser.py @@ -12,15 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import textwrap - -try: - from typing import Optional -except ImportError: - pass - - import queue +import textwrap +from typing import Optional from serial.tools import miniterm diff --git a/tools/idf_monitor_base/console_reader.py b/tools/idf_monitor_base/console_reader.py index 19dc1332c0..82e305d20f 100644 --- a/tools/idf_monitor_base/console_reader.py +++ b/tools/idf_monitor_base/console_reader.py @@ -14,6 +14,7 @@ import os +import queue import time from serial.tools.miniterm import Console @@ -22,11 +23,6 @@ from .console_parser import ConsoleParser from .constants import CMD_STOP, TAG_CMD from .stoppable_thread import StoppableThread -try: - import queue -except ImportError: - import Queue as queue # type: ignore - class ConsoleReader(StoppableThread): """ Read input keys from the console and push them to the queue, diff --git a/tools/idf_monitor_base/constants.py b/tools/idf_monitor_base/constants.py index 63efaddd87..be68b02930 100644 --- a/tools/idf_monitor_base/constants.py +++ b/tools/idf_monitor_base/constants.py @@ -18,6 +18,7 @@ import re # Control-key characters CTRL_A = '\x01' CTRL_B = '\x02' +CTRL_C = '\x03' CTRL_F = '\x06' CTRL_H = '\x08' CTRL_I = '\x09' @@ -47,7 +48,6 @@ TAG_CMD = 3 __version__ = '1.1' - # paths to scripts PANIC_OUTPUT_DECODE_SCRIPT = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py') COREDUMP_SCRIPT = os.path.join(os.path.dirname(__file__), '..', '..', 'components', 'espcoredump', 'espcoredump.py') @@ -71,3 +71,19 @@ PANIC_READING = 1 # panic handler decoding options PANIC_DECODE_DISABLE = 'disable' PANIC_DECODE_BACKTRACE = 'backtrace' + +EVENT_QUEUE_TIMEOUT = 0.03 # timeout before raising queue.Empty exception in case of empty event queue + +ESPPORT_ENVIRON = str('ESPPORT') +MAKEFLAGS_ENVIRON = 'MAKEFLAGS' + +GDB_UART_CONTINUE_COMMAND = '+$c#63' +GDB_EXIT_TIMEOUT = 0.3 # time delay between exit and writing GDB_UART_CONTINUE_COMMAND + +# workaround for data sent without EOL +# if no data received during the time, last line is considered finished +LAST_LINE_THREAD_INTERVAL = 0.1 + +MINIMAL_EN_LOW_DELAY = 0.005 +RECONNECT_DELAY = 0.5 # timeout between reconnect tries +CHECK_ALIVE_FLAG_TIMEOUT = 0.25 # timeout for checking alive flags (currently used by serial reader) diff --git a/tools/idf_monitor_base/coredump.py b/tools/idf_monitor_base/coredump.py index 4120a2f62a..555194f325 100644 --- a/tools/idf_monitor_base/coredump.py +++ b/tools/idf_monitor_base/coredump.py @@ -7,7 +7,8 @@ from contextlib import contextmanager from typing import Generator from .constants import COREDUMP_SCRIPT, TAG_KEY -from .output_helpers import Logger, yellow_print +from .logger import Logger +from .output_helpers import yellow_print from .web_socket_client import WebSocketClient # coredump related messages diff --git a/tools/idf_monitor_base/gdbhelper.py b/tools/idf_monitor_base/gdbhelper.py index 198ff3f390..f32d4fb688 100644 --- a/tools/idf_monitor_base/gdbhelper.py +++ b/tools/idf_monitor_base/gdbhelper.py @@ -5,7 +5,8 @@ import sys import tempfile from .constants import PANIC_OUTPUT_DECODE_SCRIPT -from .output_helpers import Logger, normal_print, red_print, yellow_print +from .logger import Logger +from .output_helpers import normal_print, red_print, yellow_print from .web_socket_client import WebSocketClient diff --git a/tools/idf_monitor_base/logger.py b/tools/idf_monitor_base/logger.py new file mode 100644 index 0000000000..c165651c03 --- /dev/null +++ b/tools/idf_monitor_base/logger.py @@ -0,0 +1,134 @@ +# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +import re +from typing import BinaryIO, Callable, Optional, Union # noqa: F401 + +from serial.tools import miniterm # noqa: F401 + +from .constants import MATCH_PCADDR +from .output_helpers import lookup_pc_address, red_print, yellow_print + + +class Logger: + def __init__(self, elf_file, console, timestamps, timestamp_format, pc_address_buffer, enable_address_decoding, + toolchain_prefix): + # type: (str, miniterm.Console, bool, str, bytes, bool, str) -> None + self.log_file = None # type: Optional[BinaryIO] + self._output_enabled = True # type: bool + self.elf_file = elf_file + self.console = console + self.timestamps = timestamps + self.timestamp_format = timestamp_format + self._pc_address_buffer = pc_address_buffer + self.enable_address_decoding = enable_address_decoding + self.toolchain_prefix = toolchain_prefix + + @property + def pc_address_buffer(self): # type: () -> bytes + return self._pc_address_buffer + + @pc_address_buffer.setter + def pc_address_buffer(self, value): # type: (bytes) -> None + self._pc_address_buffer = value + + @property + def output_enabled(self): # type: () -> bool + return self._output_enabled + + @output_enabled.setter + def output_enabled(self, value): # type: (bool) -> None + self._output_enabled = value + + @property + def log_file(self): # type: () -> Optional[BinaryIO] + return self._log_file + + @log_file.setter + def log_file(self, value): # type: (Optional[BinaryIO]) -> None + self._log_file = value + + def toggle_logging(self): # type: () -> None + if self._log_file: + self.stop_logging() + else: + self.start_logging() + + def toggle_timestamps(self): # type: () -> None + self.timestamps = not self.timestamps + + def start_logging(self): # type: () -> None + if not self._log_file: + name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0], + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) + try: + self.log_file = open(name, 'wb+') + yellow_print('\nLogging is enabled into file {}'.format(name)) + except Exception as e: # noqa + red_print('\nLog file {} cannot be created: {}'.format(name, e)) + + def stop_logging(self): # type: () -> None + if self._log_file: + try: + name = self._log_file.name + self._log_file.close() + yellow_print('\nLogging is disabled and file {} has been closed'.format(name)) + except Exception as e: # noqa + red_print('\nLog file cannot be closed: {}'.format(e)) + finally: + self._log_file = None + + def print(self, string, console_printer=None): # noqa: E999 + # type: (Union[str, bytes], Optional[Callable]) -> None + if console_printer is None: + console_printer = self.console.write_bytes + + if self.timestamps and (self._output_enabled or self._log_file): + t = datetime.datetime.now().strftime(self.timestamp_format) + # "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines. + if isinstance(string, type(u'')): + search_patt = '\n' + replacement = '\n' + t + ' ' + else: + search_patt = b'\n' # type: ignore + replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore + string = string.replace(search_patt, replacement) # type: ignore + if self._output_enabled: + console_printer(string) + if self._log_file: + try: + if isinstance(string, type(u'')): + string = string.encode() # type: ignore + self._log_file.write(string) # type: ignore + except Exception as e: + red_print('\nCannot write to file: {}'.format(e)) + # don't fill-up the screen with the previous errors (probably consequent prints would fail also) + self.stop_logging() + + def output_toggle(self): # type: () -> None + self.output_enabled = not self.output_enabled + yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format( + self.output_enabled)) + + def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None + line = self._pc_address_buffer + line + self._pc_address_buffer = b'' + if not self.enable_address_decoding: + return + for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')): + translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file) + if translation: + self.print(translation, console_printer=yellow_print) diff --git a/tools/idf_monitor_base/output_helpers.py b/tools/idf_monitor_base/output_helpers.py index 6a764e8206..480181e23d 100644 --- a/tools/idf_monitor_base/output_helpers.py +++ b/tools/idf_monitor_base/output_helpers.py @@ -12,17 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime -import os import subprocess import sys - -from serial.tools import miniterm - -try: - from typing import BinaryIO, Callable, Optional, Union # noqa -except ImportError: - pass +from typing import BinaryIO, Callable, Optional, Union # noqa: F401 # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be updated) ANSI_RED = '\033[1;31m' @@ -57,92 +49,3 @@ def lookup_pc_address(pc_addr, toolchain_prefix, elf_file): # type: (str, str, except OSError as e: red_print('%s: %s' % (' '.join(cmd), e)) return None - - -class Logger: - def __init__(self, elf_file, console, timestamps, timestamp_format): - # type: (str, miniterm.Console, bool, str) -> None - self.log_file = None # type: Optional[BinaryIO] - self._output_enabled = True # type: bool - self.elf_file = elf_file - self.console = console - self.timestamps = timestamps - self.timestamp_format = timestamp_format - - @property - def output_enabled(self): # type: () -> bool - return self._output_enabled - - @output_enabled.setter - def output_enabled(self, value): # type: (bool) -> None - self._output_enabled = value - - @property - def log_file(self): # type: () -> Optional[BinaryIO] - return self._log_file - - @log_file.setter - def log_file(self, value): # type: (Optional[BinaryIO]) -> None - self._log_file = value - - def toggle_logging(self): # type: () -> None - if self._log_file: - self.stop_logging() - else: - self.start_logging() - - def toggle_timestamps(self): # type: () -> None - self.timestamps = not self.timestamps - - def start_logging(self): # type: () -> None - if not self._log_file: - name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0], - datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - try: - self.log_file = open(name, 'wb+') - yellow_print('\nLogging is enabled into file {}'.format(name)) - except Exception as e: - red_print('\nLog file {} cannot be created: {}'.format(name, e)) - - def stop_logging(self): # type: () -> None - if self._log_file: - try: - name = self._log_file.name - self._log_file.close() - yellow_print('\nLogging is disabled and file {} has been closed'.format(name)) - except Exception as e: - red_print('\nLog file cannot be closed: {}'.format(e)) - finally: - self._log_file = None - - def print(self, string, console_printer=None): # noqa: E999 - # type: (Union[str, bytes], Optional[Callable]) -> None - if console_printer is None: - console_printer = self.console.write_bytes - - if self.timestamps and (self._output_enabled or self._log_file): - t = datetime.datetime.now().strftime(self.timestamp_format) - # "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines. - if isinstance(string, type(u'')): - search_patt = '\n' - replacement = '\n' + t + ' ' - else: - search_patt = b'\n' # type: ignore - replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore - string = string.replace(search_patt, replacement) # type: ignore - if self._output_enabled: - console_printer(string) - if self._log_file: - try: - if isinstance(string, type(u'')): - string = string.encode() # type: ignore - self._log_file.write(string) # type: ignore - except Exception as e: - red_print('\nCannot write to file: {}'.format(e)) - # don't fill-up the screen with the previous errors (probably consequent prints would fail also) - self.stop_logging() - - def output_toggle(self): # type: () -> None - self.output_enabled = not self.output_enabled - yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format( - self.output_enabled)) diff --git a/tools/idf_monitor_base/serial_handler.py b/tools/idf_monitor_base/serial_handler.py new file mode 100644 index 0000000000..9281fd40c9 --- /dev/null +++ b/tools/idf_monitor_base/serial_handler.py @@ -0,0 +1,195 @@ +# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import queue # noqa: F401 +import re +import subprocess +import time +from typing import Callable + +import serial # noqa: F401 +from serial.tools import miniterm # noqa: F401 + +from .chip_specific_config import get_chip_config +from .console_parser import ConsoleParser, prompt_next_action # noqa: F401 +from .console_reader import ConsoleReader # noqa: F401 +from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP, + CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, + PANIC_READING, PANIC_STACK_DUMP, PANIC_START) +from .coredump import CoreDump # noqa: F401 +from .exceptions import SerialStopException # noqa: F401 +from .gdbhelper import GDBHelper # noqa: F401 +from .line_matcher import LineMatcher # noqa: F401 +from .logger import Logger # noqa: F401 +from .output_helpers import yellow_print +from .serial_reader import SerialReader # noqa: F401 + + +def run_make(target, make, console, console_parser, event_queue, cmd_queue, logger): + # type: (str, str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue, Logger) -> None + if isinstance(make, list): + popen_args = make + [target] + else: + popen_args = [make, target] + yellow_print('Running %s...' % ' '.join(popen_args)) + p = subprocess.Popen(popen_args, env=os.environ) + try: + p.wait() + except KeyboardInterrupt: + p.wait() + if p.returncode != 0: + prompt_next_action('Build failed', console, console_parser, event_queue, cmd_queue) + else: + logger.output_enabled = True + + +class SerialHandler: + """ + The class is responsible for buffering serial input and performing corresponding commands. + """ + def __init__(self, last_line_part, serial_check_exit, logger, decode_panic, reading_panic, panic_buffer, target, + force_line_print, start_cmd_sent, serial_instance, encrypted, ): + # type: (bytes, bool, Logger, str, int, bytes,str, bool, bool, serial.Serial, bool) -> None + self._last_line_part = last_line_part + self._serial_check_exit = serial_check_exit + self.logger = logger + self._decode_panic = decode_panic + self._reading_panic = reading_panic + self._panic_buffer = panic_buffer + self.target = target + self._force_line_print = force_line_print + self.start_cmd_sent = start_cmd_sent + self.serial_instance = serial_instance + self.encrypted = encrypted + + def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher, + check_gdb_stub_and_run, finalize_line=False): + # type: (bytes, ConsoleParser, CoreDump, GDBHelper, LineMatcher, Callable, bool) -> None + # Remove "+" after Continue command + if self.start_cmd_sent: + self.start_cmd_sent = False + pos = data.find(b'+') + if pos != -1: + data = data[(pos + 1):] + + sp = data.split(b'\n') + if self._last_line_part != b'': + # add unprocessed part from previous "data" to the first line + sp[0] = self._last_line_part + sp[0] + self._last_line_part = b'' + if sp[-1] != b'': + # last part is not a full line + self._last_line_part = sp.pop() + for line in sp: + if line == b'': + continue + if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'): + raise SerialStopException() + self.check_panic_decode_trigger(line, gdb_helper) + with coredump.check(line): + if self._force_line_print or line_matcher.match(line.decode(errors='ignore')): + self.logger.print(line + b'\n') + self.logger.handle_possible_pc_address_in_line(line) + check_gdb_stub_and_run(line) + self._force_line_print = False + # Now we have the last part (incomplete line) in _last_line_part. By + # default we don't touch it and just wait for the arrival of the rest + # of the line. But after some time when we didn't received it we need + # to make a decision. + force_print_or_matched = any(( + self._force_line_print, + (finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore'))) + )) + if self._last_line_part != b'' and force_print_or_matched: + self._force_line_print = True + self.logger.print(self._last_line_part) + self.logger.handle_possible_pc_address_in_line(self._last_line_part) + check_gdb_stub_and_run(self._last_line_part) + # It is possible that the incomplete line cuts in half the PC + # address. A small buffer is kept and will be used the next time + # handle_possible_pc_address_in_line is invoked to avoid this problem. + # MATCH_PCADDR matches 10 character long addresses. Therefore, we + # keep the last 9 characters. + self.logger.pc_address_buffer = self._last_line_part[-9:] + # GDB sequence can be cut in half also. GDB sequence is 7 + # characters long, therefore, we save the last 6 characters. + gdb_helper.gdb_buffer = self._last_line_part[-6:] + self._last_line_part = b'' + # else: keeping _last_line_part and it will be processed the next time + # handle_serial_input is invoked + + def check_panic_decode_trigger(self, line, gdb_helper): # type: (bytes, GDBHelper) -> None + if self._decode_panic == PANIC_DECODE_DISABLE: + return + + if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')): + self._reading_panic = PANIC_READING + yellow_print('Stack dump detected') + + if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line: + self.logger.output_enabled = False + + if self._reading_panic == PANIC_READING: + self._panic_buffer += line.replace(b'\r', b'') + b'\n' + + if self._reading_panic == PANIC_READING and PANIC_END in line: + self._reading_panic = PANIC_IDLE + self.logger.output_enabled = True + gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target) + self._panic_buffer = b'' + + def handle_commands(self, cmd, chip, run_make_func, console_reader, serial_reader): + # type: (int, str, Callable, ConsoleReader, SerialReader) -> None + config = get_chip_config(chip) + reset_delay = config['reset'] + enter_boot_set = config['enter_boot_set'] + enter_boot_unset = config['enter_boot_unset'] + + high = False + low = True + + if cmd == CMD_STOP: + console_reader.stop() + serial_reader.stop() + elif cmd == CMD_RESET: + self.serial_instance.setRTS(low) + self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround + time.sleep(reset_delay) + self.serial_instance.setRTS(high) + self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround + self.logger.output_enabled = True + elif cmd == CMD_MAKE: + run_make_func('encrypted-flash' if self.encrypted else 'flash') + elif cmd == CMD_APP_FLASH: + run_make_func('encrypted-app-flash' if self.encrypted else 'app-flash') + elif cmd == CMD_OUTPUT_TOGGLE: + self.logger.output_toggle() + elif cmd == CMD_TOGGLE_LOGGING: + self.logger.toggle_logging() + elif cmd == CMD_TOGGLE_TIMESTAMPS: + self.logger.toggle_timestamps() + self.logger.toggle_logging() + elif cmd == CMD_ENTER_BOOT: + self.serial_instance.setDTR(high) # IO0=HIGH + self.serial_instance.setRTS(low) # EN=LOW, chip in reset + self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround + time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 + self.serial_instance.setDTR(low) # IO0=LOW + self.serial_instance.setRTS(high) # EN=HIGH, chip out of reset + self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround + time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 + self.serial_instance.setDTR(high) # IO0=HIGH, done + else: + raise RuntimeError('Bad command data %d' % cmd) # type: ignore diff --git a/tools/idf_monitor_base/serial_reader.py b/tools/idf_monitor_base/serial_reader.py index d704acf8e6..f78a575fa6 100644 --- a/tools/idf_monitor_base/serial_reader.py +++ b/tools/idf_monitor_base/serial_reader.py @@ -12,26 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import queue import sys import time import serial -from .constants import TAG_SERIAL +from .constants import CHECK_ALIVE_FLAG_TIMEOUT, MINIMAL_EN_LOW_DELAY, RECONNECT_DELAY, TAG_SERIAL from .output_helpers import red_print, yellow_print from .stoppable_thread import StoppableThread -try: - import queue -except ImportError: - import Queue as queue # type: ignore # noqa - class SerialReader(StoppableThread): """ Read serial data from the serial port and push to the event queue, until stopped. """ - def __init__(self, serial_instance, event_queue): # type: (serial.Serial, queue.Queue) -> None super(SerialReader, self).__init__() @@ -42,7 +37,7 @@ class SerialReader(StoppableThread): if not hasattr(self.serial, 'cancel_read'): # enable timeout for checking alive flag, # if cancel_read not available - self.serial.timeout = 0.25 + self.serial.timeout = CHECK_ALIVE_FLAG_TIMEOUT def run(self): # type: () -> None @@ -51,18 +46,23 @@ class SerialReader(StoppableThread): # We can come to this thread at startup or from external application line GDB. # If we come from GDB we would like to continue to run without reset. - self.serial.dtr = True # Non reset state - self.serial.rts = False # IO0=HIGH + high = False + low = True + + self.serial.dtr = low # Non reset state + self.serial.rts = high # IO0=HIGH self.serial.dtr = self.serial.dtr # usbser.sys workaround # Current state not reset the target! self.serial.open() if not self.gdb_exit: - self.serial.dtr = False # Set dtr to reset state (affected by rts) - self.serial.rts = True # Set rts/dtr to the reset state + self.serial.dtr = high # Set dtr to reset state (affected by rts) + self.serial.rts = low # Set rts/dtr to the reset state self.serial.dtr = self.serial.dtr # usbser.sys workaround - time.sleep(0.005) # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3) + + # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3) + time.sleep(MINIMAL_EN_LOW_DELAY) self.gdb_exit = False - self.serial.rts = False # Set rts/dtr to the working state + self.serial.rts = high # Set rts/dtr to the working state self.serial.dtr = self.serial.dtr # usbser.sys workaround try: while self.alive: @@ -71,13 +71,13 @@ class SerialReader(StoppableThread): except (serial.serialutil.SerialException, IOError) as e: data = b'' # self.serial.open() was successful before, therefore, this is an issue related to - # the disapperence of the device + # the disappearance of the device red_print(e) yellow_print('Waiting for the device to reconnect', newline='') self.serial.close() while self.alive: # so that exiting monitor works while waiting try: - time.sleep(0.5) + time.sleep(RECONNECT_DELAY) self.serial.open() break # device connected except serial.serialutil.SerialException: diff --git a/tools/idf_monitor_base/stoppable_thread.py b/tools/idf_monitor_base/stoppable_thread.py index 886ac362f9..38f6b7a01a 100644 --- a/tools/idf_monitor_base/stoppable_thread.py +++ b/tools/idf_monitor_base/stoppable_thread.py @@ -13,11 +13,7 @@ # limitations under the License. import threading - -try: - from typing import Optional -except ImportError: - pass +from typing import Optional class StoppableThread(object):