kopia lustrzana https://github.com/espressif/esp-idf
idf_monitor: improve the responding of console commands
rodzic
df4926b723
commit
a2155ff52d
|
@ -52,6 +52,7 @@ import ctypes
|
|||
import types
|
||||
from distutils.version import StrictVersion
|
||||
from io import open
|
||||
import textwrap
|
||||
|
||||
key_description = miniterm.key_description
|
||||
|
||||
|
@ -67,6 +68,15 @@ CTRL_P = '\x10'
|
|||
CTRL_L = '\x0c'
|
||||
CTRL_RBRACKET = '\x1d' # Ctrl+]
|
||||
|
||||
# Command parsed from console inputs
|
||||
CMD_STOP = 1
|
||||
CMD_RESET = 2
|
||||
CMD_MAKE = 3
|
||||
CMD_APP_FLASH = 4
|
||||
CMD_OUTPUT_TOGGLE = 5
|
||||
CMD_TOGGLE_LOGGING = 6
|
||||
CMD_ENTER_BOOT = 7
|
||||
|
||||
# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
|
||||
ANSI_RED = '\033[1;31m'
|
||||
ANSI_YELLOW = '\033[0;33m'
|
||||
|
@ -92,6 +102,7 @@ __version__ = "1.1"
|
|||
TAG_KEY = 0
|
||||
TAG_SERIAL = 1
|
||||
TAG_SERIAL_FLUSH = 2
|
||||
TAG_CMD = 3
|
||||
|
||||
# regex matches an potential PC value (0x4xxxxxxx)
|
||||
MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
|
||||
|
@ -149,10 +160,12 @@ class ConsoleReader(StoppableThread):
|
|||
""" Read input keys from the console and push them to the queue,
|
||||
until stopped.
|
||||
"""
|
||||
def __init__(self, console, event_queue, test_mode):
|
||||
def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
|
||||
super(ConsoleReader, self).__init__()
|
||||
self.console = console
|
||||
self.event_queue = event_queue
|
||||
self.cmd_queue = cmd_queue
|
||||
self.parser = parser
|
||||
self.test_mode = test_mode
|
||||
|
||||
def run(self):
|
||||
|
@ -181,7 +194,15 @@ class ConsoleReader(StoppableThread):
|
|||
except KeyboardInterrupt:
|
||||
c = '\x03'
|
||||
if c is not None:
|
||||
self.event_queue.put((TAG_KEY, c), False)
|
||||
ret = self.parser.parse(c)
|
||||
if ret is not None:
|
||||
(tag, cmd) = ret
|
||||
# stop command should be executed last
|
||||
if tag == TAG_CMD and cmd != CMD_STOP:
|
||||
self.cmd_queue.put(ret)
|
||||
else:
|
||||
self.event_queue.put(ret)
|
||||
|
||||
finally:
|
||||
self.console.cleanup()
|
||||
|
||||
|
@ -204,6 +225,110 @@ class ConsoleReader(StoppableThread):
|
|||
fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
|
||||
|
||||
|
||||
class ConsoleParser(object):
|
||||
|
||||
def __init__(self, eol="CRLF"):
|
||||
self.translate_eol = {
|
||||
"CRLF": lambda c: c.replace("\n", "\r\n"),
|
||||
"CR": lambda c: c.replace("\n", "\r"),
|
||||
"LF": lambda c: c.replace("\r", "\n"),
|
||||
}[eol]
|
||||
self.menu_key = CTRL_T
|
||||
self.exit_key = CTRL_RBRACKET
|
||||
self._pressed_menu_key = False
|
||||
|
||||
def parse(self, key):
|
||||
ret = None
|
||||
if self._pressed_menu_key:
|
||||
ret = self._handle_menu_key(key)
|
||||
elif key == self.menu_key:
|
||||
self._pressed_menu_key = True
|
||||
elif key == self.exit_key:
|
||||
ret = (TAG_CMD, CMD_STOP)
|
||||
else:
|
||||
key = self.translate_eol(key)
|
||||
ret = (TAG_KEY, key)
|
||||
return ret
|
||||
|
||||
def _handle_menu_key(self, c):
|
||||
ret = None
|
||||
if c == self.exit_key or c == self.menu_key: # send verbatim
|
||||
ret = (TAG_KEY, c)
|
||||
elif c in [CTRL_H, 'h', 'H', '?']:
|
||||
red_print(self.get_help_text())
|
||||
elif c == CTRL_R: # Reset device via RTS
|
||||
ret = (TAG_CMD, CMD_RESET)
|
||||
elif c == CTRL_F: # Recompile & upload
|
||||
ret = (TAG_CMD, CMD_MAKE)
|
||||
elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
|
||||
# "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
|
||||
# instead
|
||||
ret = (TAG_CMD, CMD_APP_FLASH)
|
||||
elif c == CTRL_Y: # Toggle output display
|
||||
ret = (TAG_CMD, CMD_OUTPUT_TOGGLE)
|
||||
elif c == CTRL_L: # Toggle saving output into file
|
||||
ret = (TAG_CMD, CMD_TOGGLE_LOGGING)
|
||||
elif c == CTRL_P:
|
||||
yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
|
||||
# to fast trigger pause without press menu key
|
||||
ret = (TAG_CMD, CMD_ENTER_BOOT)
|
||||
else:
|
||||
red_print('--- unknown menu character {} --'.format(key_description(c)))
|
||||
|
||||
self._pressed_menu_key = False
|
||||
return ret
|
||||
|
||||
def get_help_text(self):
|
||||
text = """\
|
||||
--- idf_monitor ({version}) - ESP-IDF monitor tool
|
||||
--- based on miniterm from pySerial
|
||||
---
|
||||
--- {exit:8} Exit program
|
||||
--- {menu:8} Menu escape key, followed by:
|
||||
--- Menu keys:
|
||||
--- {menu:14} Send the menu character itself to remote
|
||||
--- {exit:14} Send the exit character itself to remote
|
||||
--- {reset:14} Reset target board via RTS line
|
||||
--- {makecmd:14} Build & flash project
|
||||
--- {appmake:14} Build & flash app only
|
||||
--- {output:14} Toggle output display
|
||||
--- {log:14} Toggle saving output into file
|
||||
--- {pause:14} Reset target into bootloader to pause app via RTS line
|
||||
""".format(version=__version__,
|
||||
exit=key_description(self.exit_key),
|
||||
menu=key_description(self.menu_key),
|
||||
reset=key_description(CTRL_R),
|
||||
makecmd=key_description(CTRL_F),
|
||||
appmake=key_description(CTRL_A) + ' (or A)',
|
||||
output=key_description(CTRL_Y),
|
||||
log=key_description(CTRL_L),
|
||||
pause=key_description(CTRL_P))
|
||||
return textwrap.dedent(text)
|
||||
|
||||
def get_next_action_text(self):
|
||||
text = """\
|
||||
--- Press {} to exit monitor.
|
||||
--- Press {} to build & flash project.
|
||||
--- Press {} to build & flash app.
|
||||
--- Press any other key to resume monitor (resets target).
|
||||
""".format(key_description(self.exit_key),
|
||||
key_description(CTRL_F),
|
||||
key_description(CTRL_A))
|
||||
return textwrap.dedent(text)
|
||||
|
||||
def parse_next_action_key(self, c):
|
||||
ret = None
|
||||
if c == self.exit_key:
|
||||
ret = (TAG_CMD, CMD_STOP)
|
||||
elif c == CTRL_F: # Recompile & upload
|
||||
ret = (TAG_CMD, CMD_MAKE)
|
||||
elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
|
||||
# "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
|
||||
# instead
|
||||
ret = (TAG_CMD, CMD_APP_FLASH)
|
||||
return ret
|
||||
|
||||
|
||||
class SerialReader(StoppableThread):
|
||||
""" Read serial data from the serial port and push to the
|
||||
event queue, until stopped.
|
||||
|
@ -313,6 +438,7 @@ class Monitor(object):
|
|||
def __init__(self, serial_instance, elf_file, print_filter, make="make", toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol="CRLF"):
|
||||
super(Monitor, self).__init__()
|
||||
self.event_queue = queue.Queue()
|
||||
self.cmd_queue = queue.Queue()
|
||||
self.console = miniterm.Console()
|
||||
if os.name == 'nt':
|
||||
sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
|
||||
|
@ -331,7 +457,8 @@ class Monitor(object):
|
|||
|
||||
socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
|
||||
self.serial = serial_instance
|
||||
self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
|
||||
self.console_parser = ConsoleParser(eol)
|
||||
self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode)
|
||||
self.serial_reader = SerialReader(self.serial, self.event_queue)
|
||||
self.elf_file = elf_file
|
||||
if not os.path.exists(make):
|
||||
|
@ -339,17 +466,8 @@ class Monitor(object):
|
|||
else:
|
||||
self.make = make
|
||||
self.toolchain_prefix = toolchain_prefix
|
||||
self.menu_key = CTRL_T
|
||||
self.exit_key = CTRL_RBRACKET
|
||||
|
||||
self.translate_eol = {
|
||||
"CRLF": lambda c: c.replace("\n", "\r\n"),
|
||||
"CR": lambda c: c.replace("\n", "\r"),
|
||||
"LF": lambda c: c.replace("\r", "\n"),
|
||||
}[eol]
|
||||
|
||||
# internal state
|
||||
self._pressed_menu_key = False
|
||||
self._last_line_part = b""
|
||||
self._gdb_buffer = b""
|
||||
self._pc_address_buffer = b""
|
||||
|
@ -368,9 +486,24 @@ class Monitor(object):
|
|||
self.serial_reader.start()
|
||||
try:
|
||||
while self.console_reader.alive and self.serial_reader.alive:
|
||||
(event_tag, data) = self.event_queue.get()
|
||||
if event_tag == TAG_KEY:
|
||||
self.handle_key(data)
|
||||
try:
|
||||
item = self.cmd_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
try:
|
||||
item = self.event_queue.get(False, 0.001)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
(event_tag, data) = item
|
||||
if event_tag == TAG_CMD:
|
||||
self.handle_commands(data)
|
||||
elif event_tag == TAG_KEY:
|
||||
try:
|
||||
self.serial.write(codecs.encode(data))
|
||||
except serial.SerialException:
|
||||
pass # this shouldn't happen, but sometimes port has closed in serial thread
|
||||
except UnicodeEncodeError:
|
||||
pass # this can happen if a non-ascii character was passed, ignoring
|
||||
elif event_tag == TAG_SERIAL:
|
||||
self.handle_serial_input(data)
|
||||
if self._invoke_processing_last_line_timer is not None:
|
||||
|
@ -400,24 +533,6 @@ class Monitor(object):
|
|||
pass
|
||||
sys.stderr.write(ANSI_NORMAL + "\n")
|
||||
|
||||
def handle_key(self, key):
|
||||
if self._pressed_menu_key:
|
||||
self.handle_menu_key(key)
|
||||
self._pressed_menu_key = False
|
||||
elif key == self.menu_key:
|
||||
self._pressed_menu_key = True
|
||||
elif key == self.exit_key:
|
||||
self.console_reader.stop()
|
||||
self.serial_reader.stop()
|
||||
else:
|
||||
try:
|
||||
key = self.translate_eol(key)
|
||||
self.serial.write(codecs.encode(key))
|
||||
except serial.SerialException:
|
||||
pass # this shouldn't happen, but sometimes port has closed in serial thread
|
||||
except UnicodeEncodeError:
|
||||
pass # this can happen if a non-ascii character was passed, ignoring
|
||||
|
||||
def handle_serial_input(self, data, finalize_line=False):
|
||||
sp = data.split(b'\n')
|
||||
if self._last_line_part != b"":
|
||||
|
@ -429,7 +544,7 @@ class Monitor(object):
|
|||
self._last_line_part = sp.pop()
|
||||
for line in sp:
|
||||
if line != b"":
|
||||
if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
|
||||
if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
|
||||
raise SerialStopException()
|
||||
if self._force_line_print or self._line_matcher.match(line.decode(errors="ignore")):
|
||||
self._print(line + b'\n')
|
||||
|
@ -465,65 +580,6 @@ class Monitor(object):
|
|||
for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
|
||||
self.lookup_pc_address(m.group())
|
||||
|
||||
def handle_menu_key(self, c):
|
||||
if c == self.exit_key or c == self.menu_key: # send verbatim
|
||||
self.serial.write(codecs.encode(c))
|
||||
elif c in [CTRL_H, 'h', 'H', '?']:
|
||||
red_print(self.get_help_text())
|
||||
elif c == CTRL_R: # Reset device via RTS
|
||||
self.serial.setRTS(True)
|
||||
time.sleep(0.2)
|
||||
self.serial.setRTS(False)
|
||||
self.output_enable(True)
|
||||
elif c == CTRL_F: # Recompile & upload
|
||||
self.run_make("flash")
|
||||
elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only
|
||||
# "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
|
||||
# instead
|
||||
self.run_make("app-flash")
|
||||
elif c == CTRL_Y: # Toggle output display
|
||||
self.output_toggle()
|
||||
elif c == CTRL_L: # Toggle saving output into file
|
||||
self.toggle_logging()
|
||||
elif c == CTRL_P:
|
||||
yellow_print("Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart")
|
||||
# to fast trigger pause without press menu key
|
||||
self.serial.setDTR(False) # IO0=HIGH
|
||||
self.serial.setRTS(True) # EN=LOW, chip in reset
|
||||
time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
|
||||
self.serial.setDTR(True) # IO0=LOW
|
||||
self.serial.setRTS(False) # EN=HIGH, chip out of reset
|
||||
time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
|
||||
self.serial.setDTR(False) # IO0=HIGH, done
|
||||
else:
|
||||
red_print('--- unknown menu character {} --'.format(key_description(c)))
|
||||
|
||||
def get_help_text(self):
|
||||
return """
|
||||
--- idf_monitor ({version}) - ESP-IDF monitor tool
|
||||
--- based on miniterm from pySerial
|
||||
---
|
||||
--- {exit:8} Exit program
|
||||
--- {menu:8} Menu escape key, followed by:
|
||||
--- Menu keys:
|
||||
--- {menu:14} Send the menu character itself to remote
|
||||
--- {exit:14} Send the exit character itself to remote
|
||||
--- {reset:14} Reset target board via RTS line
|
||||
--- {makecmd:14} Build & flash project
|
||||
--- {appmake:14} Build & flash app only
|
||||
--- {output:14} Toggle output display
|
||||
--- {log:14} Toggle saving output into file
|
||||
--- {pause:14} Reset target into bootloader to pause app via RTS line
|
||||
""".format(version=__version__,
|
||||
exit=key_description(self.exit_key),
|
||||
menu=key_description(self.menu_key),
|
||||
reset=key_description(CTRL_R),
|
||||
makecmd=key_description(CTRL_F),
|
||||
appmake=key_description(CTRL_A) + ' (or A)',
|
||||
output=key_description(CTRL_Y),
|
||||
log=key_description(CTRL_L),
|
||||
pause=key_description(CTRL_P))
|
||||
|
||||
def __enter__(self):
|
||||
""" Use 'with self' to temporarily disable monitoring behaviour """
|
||||
self.serial_reader.stop()
|
||||
|
@ -537,25 +593,22 @@ class Monitor(object):
|
|||
def prompt_next_action(self, reason):
|
||||
self.console.setup() # set up console to trap input characters
|
||||
try:
|
||||
red_print("""
|
||||
--- {}
|
||||
--- Press {} to exit monitor.
|
||||
--- Press {} to build & flash project.
|
||||
--- Press {} to build & flash app.
|
||||
--- Press any other key to resume monitor (resets target).""".format(reason,
|
||||
key_description(self.exit_key),
|
||||
key_description(CTRL_F),
|
||||
key_description(CTRL_A)))
|
||||
red_print("--- {}".format(reason))
|
||||
red_print(self.console_parser.get_next_action_text())
|
||||
|
||||
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
|
||||
while k == CTRL_T:
|
||||
k = self.console.getkey()
|
||||
finally:
|
||||
self.console.cleanup()
|
||||
if k == self.exit_key:
|
||||
self.event_queue.put((TAG_KEY, k))
|
||||
elif k in [CTRL_F, CTRL_A]:
|
||||
self.event_queue.put((TAG_KEY, self.menu_key))
|
||||
self.event_queue.put((TAG_KEY, k))
|
||||
ret = self.console_parser.parse_next_action_key(k)
|
||||
if ret is not None:
|
||||
cmd = ret[1]
|
||||
if cmd == CMD_STOP:
|
||||
# the stop command should be handled last
|
||||
self.event_queue.put(ret)
|
||||
else:
|
||||
self.cmd_queue.put(ret)
|
||||
|
||||
def run_make(self, target):
|
||||
with self:
|
||||
|
@ -676,6 +729,34 @@ class Monitor(object):
|
|||
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
|
||||
self.stop_logging()
|
||||
|
||||
def handle_commands(self, cmd):
|
||||
if cmd == CMD_STOP:
|
||||
self.console_reader.stop()
|
||||
self.serial_reader.stop()
|
||||
elif cmd == CMD_RESET:
|
||||
self.serial.setRTS(True)
|
||||
time.sleep(0.2)
|
||||
self.serial.setRTS(False)
|
||||
self.output_enable(True)
|
||||
elif cmd == CMD_MAKE:
|
||||
self.run_make("flash")
|
||||
elif cmd == CMD_APP_FLASH:
|
||||
self.run_make("app-flash")
|
||||
elif cmd == CMD_OUTPUT_TOGGLE:
|
||||
self.output_toggle()
|
||||
elif cmd == CMD_TOGGLE_LOGGING:
|
||||
self.toggle_logging()
|
||||
elif cmd == CMD_ENTER_BOOT:
|
||||
self.serial.setDTR(False) # IO0=HIGH
|
||||
self.serial.setRTS(True) # EN=LOW, chip in reset
|
||||
time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
|
||||
self.serial.setDTR(True) # IO0=LOW
|
||||
self.serial.setRTS(False) # EN=HIGH, chip out of reset
|
||||
time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
|
||||
self.serial.setDTR(False) # IO0=HIGH, done
|
||||
else:
|
||||
raise RuntimeError("Bad command data %d" % (cmd))
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
|
||||
|
@ -748,9 +829,9 @@ def main():
|
|||
yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
|
||||
p=serial_instance))
|
||||
yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
|
||||
key_description(monitor.exit_key),
|
||||
key_description(monitor.menu_key),
|
||||
key_description(monitor.menu_key),
|
||||
key_description(monitor.console_parser.exit_key),
|
||||
key_description(monitor.console_parser.menu_key),
|
||||
key_description(monitor.console_parser.menu_key),
|
||||
key_description(CTRL_H)))
|
||||
if args.print_filter != DEFAULT_PRINT_FILTER:
|
||||
yellow_print('--- Print filter: {} ---'.format(args.print_filter))
|
||||
|
|
Ładowanie…
Reference in New Issue