diff --git a/tools/idf_monitor.py b/tools/idf_monitor.py index f68020613f..5320149dbe 100755 --- a/tools/idf_monitor.py +++ b/tools/idf_monitor.py @@ -52,6 +52,7 @@ import ctypes import types from distutils.version import StrictVersion from io import open +import textwrap key_description = miniterm.key_description @@ -67,6 +68,15 @@ CTRL_P = '\x10' CTRL_L = '\x0c' CTRL_RBRACKET = '\x1d' # Ctrl+] +# Command parsed from console inputs +CMD_STOP = 1 +CMD_RESET = 2 +CMD_MAKE = 3 +CMD_APP_FLASH = 4 +CMD_OUTPUT_TOGGLE = 5 +CMD_TOGGLE_LOGGING = 6 +CMD_ENTER_BOOT = 7 + # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated) ANSI_RED = '\033[1;31m' ANSI_YELLOW = '\033[0;33m' @@ -92,6 +102,7 @@ __version__ = "1.1" TAG_KEY = 0 TAG_SERIAL = 1 TAG_SERIAL_FLUSH = 2 +TAG_CMD = 3 # regex matches an potential PC value (0x4xxxxxxx) MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE) @@ -149,10 +160,12 @@ class ConsoleReader(StoppableThread): """ Read input keys from the console and push them to the queue, until stopped. """ - def __init__(self, console, event_queue, test_mode): + def __init__(self, console, event_queue, cmd_queue, parser, test_mode): super(ConsoleReader, self).__init__() self.console = console self.event_queue = event_queue + self.cmd_queue = cmd_queue + self.parser = parser self.test_mode = test_mode def run(self): @@ -181,7 +194,15 @@ class ConsoleReader(StoppableThread): except KeyboardInterrupt: c = '\x03' if c is not None: - self.event_queue.put((TAG_KEY, c), False) + ret = self.parser.parse(c) + if ret is not None: + (tag, cmd) = ret + # stop command should be executed last + if tag == TAG_CMD and cmd != CMD_STOP: + self.cmd_queue.put(ret) + else: + self.event_queue.put(ret) + finally: self.console.cleanup() @@ -204,6 +225,110 @@ class ConsoleReader(StoppableThread): fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0') +class ConsoleParser(object): + + def __init__(self, eol="CRLF"): + self.translate_eol = { + "CRLF": lambda c: c.replace("\n", "\r\n"), + "CR": lambda c: c.replace("\n", "\r"), + "LF": lambda c: c.replace("\r", "\n"), + }[eol] + self.menu_key = CTRL_T + self.exit_key = CTRL_RBRACKET + self._pressed_menu_key = False + + def parse(self, key): + ret = None + if self._pressed_menu_key: + ret = self._handle_menu_key(key) + elif key == self.menu_key: + self._pressed_menu_key = True + elif key == self.exit_key: + ret = (TAG_CMD, CMD_STOP) + else: + key = self.translate_eol(key) + ret = (TAG_KEY, key) + return ret + + def _handle_menu_key(self, c): + ret = None + if c == self.exit_key or c == self.menu_key: # send verbatim + ret = (TAG_KEY, c) + elif c in [CTRL_H, 'h', 'H', '?']: + red_print(self.get_help_text()) + elif c == CTRL_R: # Reset device via RTS + ret = (TAG_CMD, CMD_RESET) + elif c == CTRL_F: # Recompile & upload + ret = (TAG_CMD, CMD_MAKE) + elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only + # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used + # instead + ret = (TAG_CMD, CMD_APP_FLASH) + elif c == CTRL_Y: # Toggle output display + ret = (TAG_CMD, CMD_OUTPUT_TOGGLE) + elif c == CTRL_L: # Toggle saving output into file + ret = (TAG_CMD, CMD_TOGGLE_LOGGING) + elif c == CTRL_P: + yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart") + # to fast trigger pause without press menu key + ret = (TAG_CMD, CMD_ENTER_BOOT) + else: + red_print('--- unknown menu character {} --'.format(key_description(c))) + + self._pressed_menu_key = False + return ret + + def get_help_text(self): + text = """\ + --- idf_monitor ({version}) - ESP-IDF monitor tool + --- based on miniterm from pySerial + --- + --- {exit:8} Exit program + --- {menu:8} Menu escape key, followed by: + --- Menu keys: + --- {menu:14} Send the menu character itself to remote + --- {exit:14} Send the exit character itself to remote + --- {reset:14} Reset target board via RTS line + --- {makecmd:14} Build & flash project + --- {appmake:14} Build & flash app only + --- {output:14} Toggle output display + --- {log:14} Toggle saving output into file + --- {pause:14} Reset target into bootloader to pause app via RTS line + """.format(version=__version__, + exit=key_description(self.exit_key), + menu=key_description(self.menu_key), + reset=key_description(CTRL_R), + makecmd=key_description(CTRL_F), + appmake=key_description(CTRL_A) + ' (or A)', + output=key_description(CTRL_Y), + log=key_description(CTRL_L), + pause=key_description(CTRL_P)) + return textwrap.dedent(text) + + def get_next_action_text(self): + text = """\ + --- Press {} to exit monitor. + --- Press {} to build & flash project. + --- Press {} to build & flash app. + --- Press any other key to resume monitor (resets target). + """.format(key_description(self.exit_key), + key_description(CTRL_F), + key_description(CTRL_A)) + return textwrap.dedent(text) + + def parse_next_action_key(self, c): + ret = None + if c == self.exit_key: + ret = (TAG_CMD, CMD_STOP) + elif c == CTRL_F: # Recompile & upload + ret = (TAG_CMD, CMD_MAKE) + elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only + # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used + # instead + ret = (TAG_CMD, CMD_APP_FLASH) + return ret + + class SerialReader(StoppableThread): """ Read serial data from the serial port and push to the event queue, until stopped. @@ -313,6 +438,7 @@ class Monitor(object): def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"): super(Monitor, self).__init__() self.event_queue = queue.Queue() + self.cmd_queue = queue.Queue() self.console = miniterm.Console() if os.name == 'nt': sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True) @@ -331,7 +457,8 @@ class Monitor(object): socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor self.serial = serial_instance - self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode) + self.console_parser = ConsoleParser(eol) + self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode) self.serial_reader = SerialReader(self.serial, self.event_queue) self.elf_file = elf_file if not os.path.exists(make): @@ -339,17 +466,8 @@ class Monitor(object): else: self.make = make self.toolchain_prefix = toolchain_prefix - self.menu_key = CTRL_T - self.exit_key = CTRL_RBRACKET - - self.translate_eol = { - "CRLF": lambda c: c.replace("\n", "\r\n"), - "CR": lambda c: c.replace("\n", "\r"), - "LF": lambda c: c.replace("\r", "\n"), - }[eol] # internal state - self._pressed_menu_key = False self._last_line_part = b"" self._gdb_buffer = b"" self._pc_address_buffer = b"" @@ -368,9 +486,24 @@ class Monitor(object): self.serial_reader.start() try: while self.console_reader.alive and self.serial_reader.alive: - (event_tag, data) = self.event_queue.get() - if event_tag == TAG_KEY: - self.handle_key(data) + try: + item = self.cmd_queue.get_nowait() + except queue.Empty: + try: + item = self.event_queue.get(False, 0.001) + except queue.Empty: + continue + + (event_tag, data) = item + if event_tag == TAG_CMD: + self.handle_commands(data) + elif event_tag == TAG_KEY: + try: + self.serial.write(codecs.encode(data)) + except serial.SerialException: + pass # this shouldn't happen, but sometimes port has closed in serial thread + except UnicodeEncodeError: + pass # this can happen if a non-ascii character was passed, ignoring elif event_tag == TAG_SERIAL: self.handle_serial_input(data) if self._invoke_processing_last_line_timer is not None: @@ -400,24 +533,6 @@ class Monitor(object): pass sys.stderr.write(ANSI_NORMAL + "\n") - def handle_key(self, key): - if self._pressed_menu_key: - self.handle_menu_key(key) - self._pressed_menu_key = False - elif key == self.menu_key: - self._pressed_menu_key = True - elif key == self.exit_key: - self.console_reader.stop() - self.serial_reader.stop() - else: - try: - key = self.translate_eol(key) - self.serial.write(codecs.encode(key)) - except serial.SerialException: - pass # this shouldn't happen, but sometimes port has closed in serial thread - except UnicodeEncodeError: - pass # this can happen if a non-ascii character was passed, ignoring - def handle_serial_input(self, data, finalize_line=False): sp = data.split(b'\n') if self._last_line_part != b"": @@ -429,7 +544,7 @@ class Monitor(object): self._last_line_part = sp.pop() for line in sp: if line != b"": - if self._serial_check_exit and line == self.exit_key.encode('latin-1'): + if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'): raise SerialStopException() if self._force_line_print or self._line_matcher.match(line.decode(errors="ignore")): self._print(line + b'\n') @@ -465,65 +580,6 @@ class Monitor(object): for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")): self.lookup_pc_address(m.group()) - def handle_menu_key(self, c): - if c == self.exit_key or c == self.menu_key: # send verbatim - self.serial.write(codecs.encode(c)) - elif c in [CTRL_H, 'h', 'H', '?']: - red_print(self.get_help_text()) - elif c == CTRL_R: # Reset device via RTS - self.serial.setRTS(True) - time.sleep(0.2) - self.serial.setRTS(False) - self.output_enable(True) - elif c == CTRL_F: # Recompile & upload - self.run_make("flash") - elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only - # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used - # instead - self.run_make("app-flash") - elif c == CTRL_Y: # Toggle output display - self.output_toggle() - elif c == CTRL_L: # Toggle saving output into file - self.toggle_logging() - elif c == CTRL_P: - yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart") - # to fast trigger pause without press menu key - self.serial.setDTR(False) # IO0=HIGH - self.serial.setRTS(True) # EN=LOW, chip in reset - time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 - self.serial.setDTR(True) # IO0=LOW - self.serial.setRTS(False) # EN=HIGH, chip out of reset - time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 - self.serial.setDTR(False) # IO0=HIGH, done - else: - red_print('--- unknown menu character {} --'.format(key_description(c))) - - def get_help_text(self): - return """ ---- idf_monitor ({version}) - ESP-IDF monitor tool ---- based on miniterm from pySerial ---- ---- {exit:8} Exit program ---- {menu:8} Menu escape key, followed by: ---- Menu keys: ---- {menu:14} Send the menu character itself to remote ---- {exit:14} Send the exit character itself to remote ---- {reset:14} Reset target board via RTS line ---- {makecmd:14} Build & flash project ---- {appmake:14} Build & flash app only ---- {output:14} Toggle output display ---- {log:14} Toggle saving output into file ---- {pause:14} Reset target into bootloader to pause app via RTS line -""".format(version=__version__, - exit=key_description(self.exit_key), - menu=key_description(self.menu_key), - reset=key_description(CTRL_R), - makecmd=key_description(CTRL_F), - appmake=key_description(CTRL_A) + ' (or A)', - output=key_description(CTRL_Y), - log=key_description(CTRL_L), - pause=key_description(CTRL_P)) - def __enter__(self): """ Use 'with self' to temporarily disable monitoring behaviour """ self.serial_reader.stop() @@ -537,25 +593,22 @@ class Monitor(object): def prompt_next_action(self, reason): self.console.setup() # set up console to trap input characters try: - red_print(""" ---- {} ---- Press {} to exit monitor. ---- Press {} to build & flash project. ---- Press {} to build & flash app. ---- Press any other key to resume monitor (resets target).""".format(reason, - key_description(self.exit_key), - key_description(CTRL_F), - key_description(CTRL_A))) + red_print("--- {}".format(reason)) + red_print(self.console_parser.get_next_action_text()) + k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. while k == CTRL_T: k = self.console.getkey() finally: self.console.cleanup() - if k == self.exit_key: - self.event_queue.put((TAG_KEY, k)) - elif k in [CTRL_F, CTRL_A]: - self.event_queue.put((TAG_KEY, self.menu_key)) - self.event_queue.put((TAG_KEY, k)) + ret = self.console_parser.parse_next_action_key(k) + if ret is not None: + cmd = ret[1] + if cmd == CMD_STOP: + # the stop command should be handled last + self.event_queue.put(ret) + else: + self.cmd_queue.put(ret) def run_make(self, target): with self: @@ -676,6 +729,34 @@ class Monitor(object): # don't fill-up the screen with the previous errors (probably consequent prints would fail also) self.stop_logging() + def handle_commands(self, cmd): + if cmd == CMD_STOP: + self.console_reader.stop() + self.serial_reader.stop() + elif cmd == CMD_RESET: + self.serial.setRTS(True) + time.sleep(0.2) + self.serial.setRTS(False) + self.output_enable(True) + elif cmd == CMD_MAKE: + self.run_make("flash") + elif cmd == CMD_APP_FLASH: + self.run_make("app-flash") + elif cmd == CMD_OUTPUT_TOGGLE: + self.output_toggle() + elif cmd == CMD_TOGGLE_LOGGING: + self.toggle_logging() + elif cmd == CMD_ENTER_BOOT: + self.serial.setDTR(False) # IO0=HIGH + self.serial.setRTS(True) # EN=LOW, chip in reset + time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 + self.serial.setDTR(True) # IO0=LOW + self.serial.setRTS(False) # EN=HIGH, chip out of reset + time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 + self.serial.setDTR(False) # IO0=HIGH, done + else: + raise RuntimeError("Bad command data %d" % (cmd)) + def main(): parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf") @@ -748,9 +829,9 @@ def main(): yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format( p=serial_instance)) yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format( - key_description(monitor.exit_key), - key_description(monitor.menu_key), - key_description(monitor.menu_key), + key_description(monitor.console_parser.exit_key), + key_description(monitor.console_parser.menu_key), + key_description(monitor.console_parser.menu_key), key_description(CTRL_H))) if args.print_filter != DEFAULT_PRINT_FILTER: yellow_print('--- Print filter: {} ---'.format(args.print_filter))