kopia lustrzana https://github.com/micropython/micropython-lib
aiorepl: Add support for raw mode (ctrl-a).
Provides support for mpremote features like cp and mount. Signed-off-by: Andrew Leech <andrew.leech@planetinnovation.com.au>pull/756/head
rodzic
10c9281dad
commit
f672baa92b
|
@ -160,17 +160,14 @@ async def task(g=None, prompt="--> "):
|
|||
else:
|
||||
cmd = cmd[:-1]
|
||||
sys.stdout.write("\x08 \x08")
|
||||
elif c == CHAR_CTRL_A:
|
||||
await raw_repl(s, g)
|
||||
break
|
||||
elif c == CHAR_CTRL_B:
|
||||
continue
|
||||
elif c == CHAR_CTRL_C:
|
||||
if paste:
|
||||
break
|
||||
if pc == CHAR_CTRL_C and time.ticks_diff(t, pt) < 20:
|
||||
# Two very quick Ctrl-C (faster than a human
|
||||
# typing) likely means mpremote trying to
|
||||
# escape.
|
||||
asyncio.new_event_loop()
|
||||
return
|
||||
sys.stdout.write("\n")
|
||||
break
|
||||
elif c == CHAR_CTRL_D:
|
||||
|
@ -240,3 +237,89 @@ async def task(g=None, prompt="--> "):
|
|||
cmd += b
|
||||
finally:
|
||||
micropython.kbd_intr(3)
|
||||
|
||||
|
||||
async def raw_paste(s, g, window=512):
|
||||
sys.stdout.write("R\x01") # supported
|
||||
sys.stdout.write(bytearray([window & 0xFF, window >> 8, 0x01]).decode())
|
||||
eof = False
|
||||
idx = 0
|
||||
buff = bytearray(window)
|
||||
file = b""
|
||||
while not eof:
|
||||
for idx in range(window):
|
||||
b = await s.read(1)
|
||||
c = ord(b)
|
||||
if c == CHAR_CTRL_C or c == CHAR_CTRL_D:
|
||||
# end of file
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
if c == CHAR_CTRL_C:
|
||||
raise KeyboardInterrupt
|
||||
file += buff[:idx]
|
||||
eof = True
|
||||
break
|
||||
buff[idx] = c
|
||||
|
||||
if not eof:
|
||||
file += buff
|
||||
sys.stdout.write("\x01") # indicate window available to host
|
||||
|
||||
return file
|
||||
|
||||
|
||||
async def raw_repl(s: asyncio.StreamReader, g: dict):
|
||||
heading = "raw REPL; CTRL-B to exit\n"
|
||||
line = ""
|
||||
sys.stdout.write(heading)
|
||||
|
||||
while True:
|
||||
line = ""
|
||||
sys.stdout.write(">")
|
||||
while True:
|
||||
b = await s.read(1)
|
||||
c = ord(b)
|
||||
if c == CHAR_CTRL_A:
|
||||
rline = line
|
||||
line = ""
|
||||
|
||||
if len(rline) == 2 and ord(rline[0]) == CHAR_CTRL_E:
|
||||
if rline[1] == "A":
|
||||
line = await raw_paste(s, g)
|
||||
break
|
||||
else:
|
||||
# reset raw REPL
|
||||
sys.stdout.write(heading)
|
||||
sys.stdout.write(">")
|
||||
continue
|
||||
elif c == CHAR_CTRL_B:
|
||||
# exit raw REPL
|
||||
sys.stdout.write("\n")
|
||||
return 0
|
||||
elif c == CHAR_CTRL_C:
|
||||
# clear line
|
||||
line = ""
|
||||
elif c == CHAR_CTRL_D:
|
||||
# entry finished
|
||||
# indicate reception of command
|
||||
sys.stdout.write("OK")
|
||||
break
|
||||
else:
|
||||
# let through any other raw 8-bit value
|
||||
line += b
|
||||
|
||||
if len(line) == 0:
|
||||
# Normally used to trigger soft-reset but stay in raw mode.
|
||||
# Fake it for aiorepl / mpremote.
|
||||
sys.stdout.write("Ignored: soft reboot\n")
|
||||
sys.stdout.write(heading)
|
||||
|
||||
try:
|
||||
result = exec(line, g)
|
||||
if result is not None:
|
||||
sys.stdout.write(repr(result))
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
except Exception as ex:
|
||||
print(line)
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
sys.print_exception(ex, sys.stdout)
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
metadata(
|
||||
version="0.1.1",
|
||||
version="0.2.0",
|
||||
description="Provides an asynchronous REPL that can run concurrently with an asyncio, also allowing await expressions.",
|
||||
)
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue